Я хочу реализовать веб-сервис с потоковым выводом с использованием FastAPI. В ходе проверки давления обнаружено, что для одного и того же запроса постоянно выполняется не один и тот же поток. В результате некоторые промежуточные переменные, хранящиеся в threadlocal, неупорядочены. Я реализовал демо-версию и протестировал ее. Показать, что поток не возвращает тот же поток по одному и тому же запросу? Могу я спросить, почему?
import logging
import threading
import time
import uvicorn
from fastapi import FastAPI
from sse_starlette import EventSourceResponse
app = FastAPI()
@app.get("/")
def stream_output():
logging.warning(f"{threading.current_thread().ident}")
return EventSourceResponse(num_generator(10), headers = {"thread": str(threading.current_thread().ident)})
def num_generator(n):
for i in range(n):
logging.warning(f"{threading.currentThread().ident}: %s" % i)
time.sleep(2)
yield f"thread: {threading.current_thread().ident} num: {i}"
logging.warning(f"{threading.current_thread().ident}: end")
if __name__ == "__main__":
uvicorn.run(app, host='localhost', port=8000)
Результат испытания под давлением:
результат одного из запросов:
data: thread: 39404 num: 0
data: thread: 39404 num: 1
data: thread: 57624 num: 2
data: thread: 39404 num: 3
data: thread: 52536 num: 4
data: thread: 39404 num: 5
data: thread: 52536 num: 6
data: thread: 39404 num: 7
data: thread: 39404 num: 8
data: thread: 52536 num: 9
Я ожидаю того же потока для того же запроса
data: thread: 56052 num: 0
data: thread: 56052 num: 1
data: thread: 56052 num: 2
data: thread: 56052 num: 3
data: thread: 56052 num: 4
data: thread: 56052 num: 5
data: thread: 56052 num: 6
data: thread: 56052 num: 7
data: thread: 56052 num: 8
data: thread: 56052 num: 9
Ответ, который вы ищете, должен частично находиться в исходном коде сторонней библиотеки, которую вы используете для отправки SSE; точнее, в реализации класса EventSourceResponse
, который вы возвращаете из конечной точки stream_output()
.
Аналогично официальному FastAPI/Starlette StreamingResponse
— пожалуйста, посмотрите этот ответ и этот ответ для получения более подробной информации и объяснений — причина появления различных потоков в журналах заключается в том, что когда вы передаете синхронный генератор EventSourceResponse
(т. е. обычная функция def
вместо async def
), такая как функция num_generator()
в вашем примере, EventSourceResponse
выполнит функцию def
в отдельном потоке из внешнего пула потоков, используя iterate_in_threadpool()
Starlette (см. соответствующую реализацию EventSourceResponse ).
Опять же, для получения более подробной информации ознакомьтесь с ответами по ссылкам выше.
Спасибо за ваш исчерпывающий комментарий! Я прочитал эти ответы и попробовал несколько раз. Но все же остался один вопрос. Когда генератор был обработан в asyncgenerator в iterate_in_threadpool(), один поток из пула потоков выполняет один asyncgenerator или выполняет один единственный шаг asyncgenerator? Есть ли какие-либо методы достижения ожидаемого результата?
Если вместо этого вы передадите генератор async
, он будет работать непосредственно в цикле событий, который выполняется в основном потоке; следовательно, никакие потоки из пула потоков использоваться не будут. В дополнение к ссылкам, приведенным выше, вы также можете найти этот ответ полезным в отношении пула потоков.
В любом случае спасибо, я попробовал другие способы решения проблемы.
«поток не возвращает тот же поток для одного и того же запроса». Почему это должно быть? Пакету не имеет смысла отслеживать все полученные запросы и сверять каждый запрос с этой историей, чтобы гарантировать желаемое поведение. Я думаю, вам было бы лучше попытаться реализовать свою программу без этих ожиданий.