Я искал эквивалент await Promise.all() функциональности JavaScript в Python, что привело меня к asyncio.gather(). Прочитав несколько объяснений и следуя нескольким примерам, мне не удалось заставить что-либо работать асинхронно.
Задача проста: удаленно извлечь значения из нескольких файлов из S3, а затем собрать результаты, когда все будет готово. Я сделал это в JS, и чтение из 12 файлов занимает немногим более секунды.
Код написан для FastAPI, и его упрощенная форма приведена ниже. Причина, по которой я знаю, что это не работает асинхронно, заключается в том, что чем больше файлов в s3 он читает, тем больше времени это занимает.
Я видел документацию по такого рода вещам, но, поскольку она не работает для меня, я не уверен, делаю ли я что-то неправильно или это просто не сработает в моем случае использования. Я беспокоюсь, что потоковая передача из удаленного файла с использованием rasterio в этом случае просто не работает.
Как я могу изменить код ниже, чтобы он вызывал функции одновременно и собирал все ответы ниже, когда они все завершены? Раньше я не использовал эту функцию в python, поэтому просто нужно немного больше пояснений.
async def read_from_file(s3_path):
# The important thing to note here is that it
# is streaming from a file in s3 given an s3 path
with rasterio.open(s3_path) as src:
values = src.read(1, window=Window(1, 2, 1, 1))
return values[0][0]
@app.get("/get-all")
async def get_all():
start_time = datetime.datetime.now()
# example paths
s3_paths = [
"s3:file-1",
"s3:file-2",
"s3:file-3",
"s3:file-4",
"s3:file-5",
"s3:file-6",
]
values = await asyncio.gather(
read_from_file(s3_paths[0]),
read_from_file(s3_paths[1]),
read_from_file(s3_paths[2]),
read_from_file(s3_paths[3]),
read_from_file(s3_paths[4]),
read_from_file(s3_paths[5]),
)
end_time = datetime.datetime.now()
logger.info(f"duration: {end_time-start_time}")
а ок, большое спасибо. Тогда я пойду по многопоточному маршруту.
Отвечает ли это на ваш вопрос? FastAPI запускает API-вызовы последовательно, а не параллельно
См. github.com/aio-libs/aiobotocore для асинхронно-совместимого клиента s3.
Тем не менее, нет необходимости писать многопоточный код самостоятельно — вы можете просто заставить вызовы rasterio выполняться в отдельных потоках, используя метод асинхронного цикла: он сделает свое дело за вас, почти не нуждаясь в изменении вашего кода. (хотя я не думаю, что это хорошо работает с блоком . run_in_executor: просто вызовите with и .open в плоском коде.






Python asyncio имеет механизм для запуска неасинхронного кода, такого как вызовы rasterio lib, в других потоках, чтобы асинхронный цикл не блокировался.
Попробуйте этот код:
import asyncio
from functools import partial
async def read_from_file(s3_path):
# The important thing to note here is that it
# is streaming from a file in s3 given an s3 path
loop = asyncio.get_running_loop()
try:
src = await loop.run_in_executor(None, rasterio.open, s3_path)
values = await loop.run_in_executor(None, partial(src.read, 1, window=Window(1, 2, 1, 1))
finally:
src.close() # might be interesting to paralelize this as well
return values[0][0]
Если вам нужно быть быстрее, вы можете создать собственный исполнитель: я думаю, что по умолчанию он будет использовать только потоки n_cpu и может замедлить работу, когда узким местом является сетевая задержка - может быть интересно около 20 потоков. (Этот исполнитель должен быть либо глобальным ресурсом, либо передаваться в качестве параметра вашему read_from_file и быть простым concurrent.futures.ThreadpoolPoolExecutor (https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor)
Что касается run_in_executor, проверьте https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
Чтобы воспользоваться преимуществами асинхронных потоков, функция не может блокировать поток. Похоже,
rasterio.open/readне являются асинхронными функциями. Это означает, что он не возвращает управление циклу обработки событий, выполняя медленную работу по чтению.