Асинхронные одновременные вызовы функций в Python

Я искал эквивалент await Promise.all() функциональности JavaScript в Python, что привело меня к asyncio.gather(). Прочитав несколько объяснений и следуя нескольким примерам, мне не удалось заставить что-либо работать асинхронно.

Задача проста: удаленно извлечь значения из нескольких файлов из S3, а затем собрать результаты, когда все будет готово. Я сделал это в JS, и чтение из 12 файлов занимает немногим более секунды.

Код написан для FastAPI, и его упрощенная форма приведена ниже. Причина, по которой я знаю, что это не работает асинхронно, заключается в том, что чем больше файлов в s3 он читает, тем больше времени это занимает.

Я видел документацию по такого рода вещам, но, поскольку она не работает для меня, я не уверен, делаю ли я что-то неправильно или это просто не сработает в моем случае использования. Я беспокоюсь, что потоковая передача из удаленного файла с использованием rasterio в этом случае просто не работает.

Как я могу изменить код ниже, чтобы он вызывал функции одновременно и собирал все ответы ниже, когда они все завершены? Раньше я не использовал эту функцию в python, поэтому просто нужно немного больше пояснений.

async def read_from_file(s3_path):
    # The important thing to note here is that it
    #  is streaming from a file in s3 given an s3 path
    with rasterio.open(s3_path) as src:
        values = src.read(1, window=Window(1, 2, 1, 1))
        return values[0][0]

@app.get("/get-all")
async def get_all():
    start_time = datetime.datetime.now()
    # example paths
    s3_paths = [
        "s3:file-1",
        "s3:file-2",
        "s3:file-3",
        "s3:file-4",
        "s3:file-5",
        "s3:file-6",
    ]

    values = await asyncio.gather(
        read_from_file(s3_paths[0]),
        read_from_file(s3_paths[1]),
        read_from_file(s3_paths[2]),
        read_from_file(s3_paths[3]),
        read_from_file(s3_paths[4]),
        read_from_file(s3_paths[5]),
    )

    end_time = datetime.datetime.now()
    logger.info(f"duration: {end_time-start_time}")

Чтобы воспользоваться преимуществами асинхронных потоков, функция не может блокировать поток. Похоже, rasterio.open/read не являются асинхронными функциями. Это означает, что он не возвращает управление циклу обработки событий, выполняя медленную работу по чтению.

Mark 04.10.2022 15:45

а ок, большое спасибо. Тогда я пойду по многопоточному маршруту.

sobmortin354 04.10.2022 16:49

Отвечает ли это на ваш вопрос? FastAPI запускает API-вызовы последовательно, а не параллельно

Chris 04.10.2022 17:08

См. github.com/aio-libs/aiobotocore для асинхронно-совместимого клиента s3.

MatsLindh 04.10.2022 21:16

Тем не менее, нет необходимости писать многопоточный код самостоятельно — вы можете просто заставить вызовы rasterio выполняться в отдельных потоках, используя метод асинхронного цикла: он сделает свое дело за вас, почти не нуждаясь в изменении вашего кода. (хотя я не думаю, что это хорошо работает с блоком . run_in_executor: просто вызовите with и .open в плоском коде.

jsbueno 05.10.2022 00:03
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
5
112
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Python asyncio имеет механизм для запуска неасинхронного кода, такого как вызовы rasterio lib, в других потоках, чтобы асинхронный цикл не блокировался.

Попробуйте этот код:

import asyncio
from functools import partial

async def read_from_file(s3_path):
    # The important thing to note here is that it
    #  is streaming from a file in s3 given an s3 path
    loop = asyncio.get_running_loop()
    try: 
        src = await loop.run_in_executor(None, rasterio.open, s3_path)
        values = await loop.run_in_executor(None, partial(src.read, 1, window=Window(1, 2, 1, 1))
    finally:
        src.close()  # might be interesting to paralelize this as well
    return values[0][0]

Если вам нужно быть быстрее, вы можете создать собственный исполнитель: я думаю, что по умолчанию он будет использовать только потоки n_cpu и может замедлить работу, когда узким местом является сетевая задержка - может быть интересно около 20 потоков. (Этот исполнитель должен быть либо глобальным ресурсом, либо передаваться в качестве параметра вашему read_from_file и быть простым concurrent.futures.ThreadpoolPoolExecutor (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor)

Что касается run_in_executor, проверьте https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

Другие вопросы по теме