У меня есть следующий код:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
Если я запускаю свой код на своем локальном сервере, например, http://localhost:8501/ping
, на разных вкладках одного и того же окна Firefox, я получаю:
Hello
bye
Hello
bye
...
Вместо:
Hello
Hello
bye
bye
Я читал об использовании httpx, но все равно не могу добиться настоящего распараллеливания. В чем проблема?
Q :
" ... What's the problem? "
А:
В документации FastAPI прямо говорится, что фреймворк использует внутрипроцессные задачи (унаследованные от Старлетт).
Это само по себе означает, что все такие задачи соревнуются за получение (время от времени) GIL-блокировки интерпретатора Python — эффективно являющегося терроризирующим MUTEX Global Interpreter Lock, который, по сути, повторно [SERIAL]
-использует любое и все количество Внутрипроцессные потоки интерпретатора Python
для работы как один-и-только-один-РАБОТАЕТ-пока все остальные ждут...
В более мелком масштабе вы видите результат — если порождение другого обработчика для второго (инициированного вручную из второй вкладки FireFox) приходящего http-запроса на самом деле занимает больше времени, чем спящий режим, результат GIL-lock чередуется ~ 100 [ms]
time-quanta round-robin (все-подождите-один-может-работать ~ 100 [ms]
перед каждым следующим раундом GIL-lock release-acquire-roulette) Внутренняя работа интерпретатора Python не показывает более подробной информации, вы можете использовать более подробную информацию (в зависимости от по типу или версии ОС) из здесь, чтобы увидеть больше в потоке LoD, например, внутри выполняемого асинхронного кода:
import time
import threading
from fastapi import FastAPI, Request
TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict
[TEST-ME] ...
"""
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }
И последнее, но не менее важное: не стесняйтесь читать больше обо всех кодах, основанных на потоках другие акулы, которые могут пострадать... или даже вызвать... за кулисами...
Смесь GIL-lock, пулов на основе потоков, асинхронных декораторов, блокировки и обработки событий - верная смесь с неопределенностями и HWY2HELL; о)
Согласно Документация FastAPI:
When you declare a path operation function with normal
def
instead ofasync def
, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).
Таким образом, def
(синхронные) маршруты выполняются в отдельном потоке из пула потоков, или, другими словами, сервер обрабатывает запросы одновременно, тогда как async def
маршруты выполняются в основном (одиночном) потоке, т. е. сервер обрабатывает запросы последовательно - до тех пор, пока внутри таких маршрутов нет вызова await
для I/O-bound
операций, таких как данные в ожидании от клиента для отправки по сети, содержимое файла на диске для чтения, операция базы данных для завершения и т. д. - иметь посмотри здесь. Асинхронный код с async
и await
много раз описывались как использование сопрограмм.. Корутины являются совместными (или совместно многозадачный): «в любой момент времени программа с сопрограммами запускает только одну из своих сопрограмм, и эта работающая сопрограмма приостанавливает свое выполнение только тогда, когда она явно запрашивает приостановку» (см. здесь и здесь для получения дополнительной информации на сопрограммах). Однако это не относится к операциям CPU-bound
. CPU-bound
операции, даже если они объявлены в async def
функциях и вызываются с помощью await
, блокируют основной поток. Это также означает, что операция блокировки, такая как time.sleep()
, в маршруте async def
заблокирует весь сервер (как в вашем случае).
Таким образом, если ваша функция не собирается выполнять какие-либо вызовы async
, вместо этого вы должны объявить ее с помощью def
, как показано ниже:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
В противном случае, если вы собираетесь вызывать async
функции, которые вам нужно await
, вы должны использовать async def
. Чтобы продемонстрировать это, ниже используется функция asyncio.sleep()
из библиотеки асинцио. Аналогичный пример приведен также для здесь и здесь.
import asyncio
@app.get("/ping")
async def ping(request: Request):
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Обе приведенные выше функции будут печатать ожидаемый результат, как указано в вашем вопросе, если два запроса поступят примерно в одно и то же время.
Hello
Hello
bye
bye
Примечание: когда вы вызываете конечную точку во второй (третий и т. д.) раз, не забудьте сделать это из вкладки, изолированной от основного сеанса браузера; в противном случае запросы будут отображаться как исходящие от одного и того же клиента (вы можете проверить это с помощью print(request.client)
- число port
будет одинаковым, если обе вкладки открыты в одном окне), и, следовательно, запросы будут обрабатываться последовательно . Вы можете либо перезагрузить ту же вкладку (как она работает), либо открыть новую вкладку в окне инкогнито, либо использовать другой браузер/клиент для отправки запроса.
Если вам необходимо использовать async def
(как вам может понадобиться await
для сопрограмм внутри вашего маршрута), но также у вас есть какая-то синхронная длительная вычислительная задача, которая может блокировать сервер и не позволяет проходить другим запросам, например:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
contents = await file.read()
some_long_computation_task(contents) # this blocks other requests
print("bye")
return "pong"
потом:
Используйте больше рабочие (например, uvicorn main:app --workers 4
). Примечание: Каждый рабочий "имеет свои вещи, переменные и память". Это означает, что global
переменные/объекты и т. д. не будут использоваться совместно процессами/воркерами. В этом случае следует рассмотреть возможность использования хранилища базы данных или хранилищ ключей и значений (кэшей), как описано в здесь и здесь. Кроме того, «если вы потребляете большой объем памяти в своем коде, каждый процесс будет потреблять эквивалентный объем памяти».
Используйте модуль FastAPI (Starlette) run_in_threadpool()
from concurrency
(github src здесь и здесь) — как предложил @tiangolo здесь — который «будет запускать функцию в отдельном потоке, чтобы гарантировать, что основной поток (где запускаются сопрограммы) не будет заблокирован» (см. здесь). Как описано @tiangolo здесь, «run_in_threadpool
— ожидаемая функция, первый параметр — обычная функция, следующие параметры передаются этой функции напрямую. Она поддерживает аргументы последовательности и аргументы ключевого слова».
from fastapi.concurrency import run_in_threadpool
response = await run_in_threadpool(some_long_computation_task, contents)
В качестве альтернативы используйте asyncio
run_in_executor
:
loop = asyncio.get_running_loop()
response = await loop.run_in_executor(None, lambda:
some_long_computation_task(contents))
Вы также должны проверить, можете ли вы изменить определение вашего маршрута на def
. Например, если единственным ожидаемым методом в вашей конечной точке является чтение содержимого файла (как вы упомянули в разделе комментариев ниже), FastAPI может прочитать для вас bytes
файла (однако это должно работать для небольшие файлы, так как все содержимое будет храниться в памяти, см. здесь), или вы могли бы даже вызвать метод read()
объекта SpooledTemporaryFile
напрямую, чтобы вам не пришлось ждать метода read()
- и поскольку теперь вы можете объявить ваш маршрут с помощью def
, каждый запрос будет выполняться в отдельном потоке.
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
contents = file.file.read()
some_long_computation_task(contents)
print("bye")
return "pong"
Посмотрите этот ответ, а также документацию здесь, чтобы найти другие предлагаемые решения.
Могу я спросить, является ли file.read()
единственной async
функцией, которую вам нужно await
?
После загрузки файла (изображения) я выполняю жесткую обработку изображения и загружаю изображение на сервер AWS (есть обработчики S3). Однако в коде нет других явных ожиданий.
@Learningfrommasters Затем вы можете объявить маршрут как def
и объявить параметр файла как bytes
. Таким образом, FastAPI прочитает файл за вас, и вы получите его содержимое; и поскольку это маршрут def
, каждый запрос будет выполняться в отдельном потоке. Как описано здесь, это будет работать с небольшими файлами, так как содержимое будет храниться в памяти. Если вам нужно получить другие атрибуты, например, имя файла, вы можете передать их как Form
данные на вашу конечную точку.
Чтобы загрузить изображение, у меня есть: def myfunc(image: bytes = File(...)): Image.open(BytesIO(image)).convert('RGB'), но теперь это не удается. До этого было: async def myfunc(image: UploadFile = File(...)): Image.open(BytesIO(await image.read())).convert('RGB') Как быть без асинхронности и ожидания?
Давайте продолжить обсуждение в чате.
На самом деле это была попытка проверить, почему другой вызов выполнялся последовательно. Другая функция вызывает «UploadFile» и выполняет «ожидание file.read ()», а также запускает последовательный режим. Более того, это выполняется внутри продукта сервера amazon после шлюза API от amazon, и, следовательно, все запросы поступают с одного и того же IP-адреса, поскольку пользователь подключается к amazon, а сервер amazon вызывает мой API. Проблема в том, что операция с файлом длинная, и если у меня это сериализовано в конце, у меня есть тайм-ауты из-за ограничений Amazon. Думаю, мне придется перейти по последней ссылке, которую вы предоставили!