Как достоверно узнать, что HTTPServer в отдельном потоке работает и обслуживает?

Я запускаю объект HTTPServer в новом потоке:

from http.server import HTTPServer, SimpleHTTPRequestHandler
import threading

with HTTPServer(("localhost", 8080), SimpleHTTPRequestHandler) as httpd:
    thread = threading.Thread(target=httpd.serve_forever, daemon=True)
    thread.start()
    # Doing my work ...
    httpd.shutdown()

Здесь я хотел бы убедиться, что httpd.serve_forever() успешно запущен до того, как будет достигнуто # doing my work.... Я, конечно, могу вставить thread.join(timeout=1) после thread.start(), но это все равно рискует состоянием гонки. Есть ли надежный способ дождаться, пока serve_forever заработает?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python.
Некоторые методы, о которых вы не знали, что они существуют в Python.
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
430
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Конечно вещь. Существует обратный вызов под названием service_actions, который вызывается после каждой итерации цикла serve_forever, и вы можете использовать threading.Event() там, где вы можете ждать.

Поскольку обратный вызов вызывается после каждого возможно обслуживаемого запроса, это вызовет первоначальную задержку до 0,5 секунды (по умолчанию), но я не думаю, что это проблема. Если это так, вы можете переопределить serve_forever, чтобы сделать это до первого запроса службы.

import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler


class SignalingHTTPServer(HTTPServer):

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.ready_event = threading.Event()

    def service_actions(self):
        self.ready_event.set()


with SignalingHTTPServer(("localhost", 8080), SimpleHTTPRequestHandler) as httpd:
    thread = threading.Thread(target=httpd.serve_forever, daemon=True)
    thread.start()
    httpd.ready_event.wait()
    # Doing my work ...
    httpd.shutdown()

Другие вопросы по теме