Обработка сигналов в Uvicorn с помощью FastAPI

У меня есть приложение, использующее Uvicorn с FastAPI. У меня также открыты некоторые связи (например, с MongoDB). Я хочу изящно закрыть эти соединения, как только появится какой-то сигнал (SIGINT, SIGTERM и SIGKILL).

Мой server.py файл:

import uvicorn
import fastapi
import signal
import asyncio

from source.gql import gql


app = fastapi.FastAPI()

app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
app.mount("/graphql", gql)

# handle signals
HANDLED_SIGNALS = (
    signal.SIGINT,
    signal.SIGTERM
)

loop = asyncio.get_event_loop()
for sig in HANDLED_SIGNALS:
    loop.add_signal_handler(sig, _some_callback_func)

if __name__ == "__main__":
    uvicorn.run(app, port=6900)

К сожалению, способ, которым я пытаюсь добиться этого, не работает. Когда я пытаюсь Ctrl+C в терминале, ничего не происходит. Я считаю, что это вызвано тем, что Uvicorn запускается в другом потоке...

Каков правильный способ сделать это? Я заметил функцию uvicorn.Server.install_signal_handlers(), но мне не повезло с ее использованием...

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
85
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

FastAPI позволяет определять обработчики событий (функции), которые необходимо выполнить перед запуском приложения или при завершении работы приложения. Таким образом, вы можете использовать событие shutdown, как описано здесь:

@app.on_event("shutdown")
def shutdown_event():
    # close connections here

Другие вопросы по теме