FastAPI запускает API-вызовы последовательно, а не параллельно

У меня есть следующий код:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}

Если я запускаю свой код на своем локальном сервере, например, http://localhost:8501/ping, на разных вкладках одного и того же окна Firefox, я получаю:

    Hello
    bye
    Hello
    bye
    ...

Вместо:

    Hello
    Hello
    bye
    bye

Я читал об использовании httpx, но все равно не могу добиться настоящего распараллеливания. В чем проблема?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
219
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Q :
" ... What's the problem? "

А:
В документации FastAPI прямо говорится, что фреймворк использует внутрипроцессные задачи (унаследованные от Старлетт).

Это само по себе означает, что все такие задачи соревнуются за получение (время от времени) GIL-блокировки интерпретатора Python — эффективно являющегося терроризирующим MUTEX Global Interpreter Lock, который, по сути, повторно [SERIAL]-использует любое и все количество Внутрипроцессные потоки интерпретатора Python
для работы как один-и-только-один-РАБОТАЕТ-пока все остальные ждут...

В более мелком масштабе вы видите результат — если порождение другого обработчика для второго (инициированного вручную из второй вкладки FireFox) приходящего http-запроса на самом деле занимает больше времени, чем спящий режим, результат GIL-lock чередуется ~ 100 [ms] time-quanta round-robin (все-подождите-один-может-работать ~ 100 [ms] перед каждым следующим раундом GIL-lock release-acquire-roulette) Внутренняя работа интерпретатора Python не показывает более подробной информации, вы можете использовать более подробную информацию (в зависимости от по типу или версии ОС) из здесь, чтобы увидеть больше в потоке LoD, например, внутри выполняемого асинхронного кода:

import time
import threading
from   fastapi import FastAPI, Request

TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"

print( TEMPLATE.format( time.perf_counter_ns(),
                        threading.get_ident(),
                       "Python Interpreter __main__ was started ..."
                        )
...
@app.get("/ping")
async def ping( request: Request ):
        """                                __doc__
        [DOC-ME]
        ping( Request ):  a mock-up AS-IS function to yield
                          a CLI/GUI self-evidence of the order-of-execution
        RETURNS:          a JSON-alike decorated dict

        [TEST-ME]         ...
        """
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "Hello..."
                                )
        #------------------------------------------------- actual blocking work
        time.sleep( 5 )
        #------------------------------------------------- actual blocking work
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "...bye"
                                )
        return { "ping": "pong!" }

И последнее, но не менее важное: не стесняйтесь читать больше обо всех кодах, основанных на потоках другие акулы, которые могут пострадать... или даже вызвать... за кулисами...

Рекламный меморандум

Смесь GIL-lock, пулов на основе потоков, асинхронных декораторов, блокировки и обработки событий - верная смесь с неопределенностями и HWY2HELL; о)

Ответ принят как подходящий

Согласно Документация FastAPI:

When you declare a path operation function with normal def instead of async def, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).

Таким образом, def (синхронные) маршруты выполняются в отдельном потоке из пула потоков, или, другими словами, сервер обрабатывает запросы одновременно, тогда как async def маршруты выполняются в основном (одиночном) потоке, т. е. сервер обрабатывает запросы последовательно - до тех пор, пока внутри таких маршрутов нет вызова await для I/O-bound операций, таких как данные в ожидании от клиента для отправки по сети, содержимое файла на диске для чтения, операция базы данных для завершения и т. д. - иметь посмотри здесь. Асинхронный код с async и await много раз описывались как использование сопрограмм.. Корутины являются совместными (или совместно многозадачный): «в любой момент времени программа с сопрограммами запускает только одну из своих сопрограмм, и эта работающая сопрограмма приостанавливает свое выполнение только тогда, когда она явно запрашивает приостановку» (см. здесь и здесь для получения дополнительной информации на сопрограммах). Однако это не относится к операциям CPU-bound. CPU-bound операции, даже если они объявлены в async def функциях и вызываются с помощью await, блокируют основной поток. Это также означает, что операция блокировки, такая как time.sleep(), в маршруте async def заблокирует весь сервер (как в вашем случае).

Таким образом, если ваша функция не собирается выполнять какие-либо вызовы async, вместо этого вы должны объявить ее с помощью def, как показано ниже:

@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

В противном случае, если вы собираетесь вызывать async функции, которые вам нужно await, вы должны использовать async def. Чтобы продемонстрировать это, ниже используется функция asyncio.sleep() из библиотеки асинцио. Аналогичный пример приведен также для здесь и здесь.

import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

Обе приведенные выше функции будут печатать ожидаемый результат, как указано в вашем вопросе, если два запроса поступят примерно в одно и то же время.

Hello
Hello
bye
bye

Примечание: когда вы вызываете конечную точку во второй (третий и т. д.) раз, не забудьте сделать это из вкладки, изолированной от основного сеанса браузера; в противном случае запросы будут отображаться как исходящие от одного и того же клиента (вы можете проверить это с помощью print(request.client) - число port будет одинаковым, если обе вкладки открыты в одном окне), и, следовательно, запросы будут обрабатываться последовательно . Вы можете либо перезагрузить ту же вкладку (как она работает), либо открыть новую вкладку в окне инкогнито, либо использовать другой браузер/клиент для отправки запроса.

Async/await и дорогие операции с привязкой к ЦП (длительные вычислительные задачи)

Если вам необходимо использовать async def (как вам может понадобиться await для сопрограмм внутри вашего маршрута), но также у вас есть какая-то синхронная длительная вычислительная задача, которая может блокировать сервер и не позволяет проходить другим запросам, например:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    contents = await file.read()
    some_long_computation_task(contents)  # this blocks other requests
    print("bye")
    return "pong"

потом:

  1. Используйте больше рабочие (например, uvicorn main:app --workers 4). Примечание: Каждый рабочий "имеет свои вещи, переменные и память". Это означает, что global переменные/объекты и т. д. не будут использоваться совместно процессами/воркерами. В этом случае следует рассмотреть возможность использования хранилища базы данных или хранилищ ключей и значений (кэшей), как описано в здесь и здесь. Кроме того, «если вы потребляете большой объем памяти в своем коде, каждый процесс будет потреблять эквивалентный объем памяти».

  2. Используйте модуль FastAPI (Starlette) run_in_threadpool() from concurrency (github src здесь и здесь) — как предложил @tiangolo здесь — который «будет запускать функцию в отдельном потоке, чтобы гарантировать, что основной поток (где запускаются сопрограммы) не будет заблокирован» (см. здесь). Как описано @tiangolo здесь, «run_in_threadpool — ожидаемая функция, первый параметр — обычная функция, следующие параметры передаются этой функции напрямую. Она поддерживает аргументы последовательности и аргументы ключевого слова».

    from fastapi.concurrency import run_in_threadpool
    response = await run_in_threadpool(some_long_computation_task, contents)
    
  3. В качестве альтернативы используйте asynciorun_in_executor:

    loop = asyncio.get_running_loop()
    response = await loop.run_in_executor(None, lambda: 
    some_long_computation_task(contents))
    
  4. Вы также должны проверить, можете ли вы изменить определение вашего маршрута на def. Например, если единственным ожидаемым методом в вашей конечной точке является чтение содержимого файла (как вы упомянули в разделе комментариев ниже), FastAPI может прочитать для вас bytes файла (однако это должно работать для небольшие файлы, так как все содержимое будет храниться в памяти, см. здесь), или вы могли бы даже вызвать метод read() объекта SpooledTemporaryFile напрямую, чтобы вам не пришлось ждать метода read() - и поскольку теперь вы можете объявить ваш маршрут с помощью def, каждый запрос будет выполняться в отдельном потоке.

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
        print("Hello")
        contents = file.file.read()
        some_long_computation_task(contents)
        print("bye")
        return "pong"
    
  5. Посмотрите этот ответ, а также документацию здесь, чтобы найти другие предлагаемые решения.

На самом деле это была попытка проверить, почему другой вызов выполнялся последовательно. Другая функция вызывает «UploadFile» и выполняет «ожидание file.read ()», а также запускает последовательный режим. Более того, это выполняется внутри продукта сервера amazon после шлюза API от amazon, и, следовательно, все запросы поступают с одного и того же IP-адреса, поскольку пользователь подключается к amazon, а сервер amazon вызывает мой API. Проблема в том, что операция с файлом длинная, и если у меня это сериализовано в конце, у меня есть тайм-ауты из-за ограничений Amazon. Думаю, мне придется перейти по последней ссылке, которую вы предоставили!

Learning from masters 17.03.2022 20:53

Могу я спросить, является ли file.read() единственной async функцией, которую вам нужно await?

Chris 17.03.2022 21:22

После загрузки файла (изображения) я выполняю жесткую обработку изображения и загружаю изображение на сервер AWS (есть обработчики S3). Однако в коде нет других явных ожиданий.

Learning from masters 18.03.2022 00:12

@Learningfrommasters Затем вы можете объявить маршрут как def и объявить параметр файла как bytes. Таким образом, FastAPI прочитает файл за вас, и вы получите его содержимое; и поскольку это маршрут def, каждый запрос будет выполняться в отдельном потоке. Как описано здесь, это будет работать с небольшими файлами, так как содержимое будет храниться в памяти. Если вам нужно получить другие атрибуты, например, имя файла, вы можете передать их как Form данные на вашу конечную точку.

Chris 18.03.2022 04:20

Чтобы загрузить изображение, у меня есть: def myfunc(image: bytes = File(...)): Image.open(BytesIO(image)).convert('RGB'), но теперь это не удается. До этого было: async def myfunc(image: UploadFile = File(...)): Image.open(BytesIO(await image.read())).convert('RGB') Как быть без асинхронности и ожидания?

Learning from masters 18.03.2022 10:24

Давайте продолжить обсуждение в чате.

Learning from masters 18.03.2022 11:31

Другие вопросы по теме