Я запускаю объект HTTPServer в новом потоке:
from http.server import HTTPServer, SimpleHTTPRequestHandler
import threading
with HTTPServer(("localhost", 8080), SimpleHTTPRequestHandler) as httpd:
thread = threading.Thread(target=httpd.serve_forever, daemon=True)
thread.start()
# Doing my work ...
httpd.shutdown()
Здесь я хотел бы убедиться, что httpd.serve_forever() успешно запущен до того, как будет достигнуто # doing my work.... Я, конечно, могу вставить thread.join(timeout=1) после thread.start(), но это все равно рискует состоянием гонки. Есть ли надежный способ дождаться, пока serve_forever заработает?
Конечно вещь. Существует обратный вызов под названием service_actions, который вызывается после каждой итерации цикла serve_forever, и вы можете использовать threading.Event() там, где вы можете ждать.
Поскольку обратный вызов вызывается после каждого возможно обслуживаемого запроса, это вызовет первоначальную задержку до 0,5 секунды (по умолчанию), но я не думаю, что это проблема. Если это так, вы можете переопределить serve_forever, чтобы сделать это до первого запроса службы.
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
class SignalingHTTPServer(HTTPServer):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.ready_event = threading.Event()
def service_actions(self):
self.ready_event.set()
with SignalingHTTPServer(("localhost", 8080), SimpleHTTPRequestHandler) as httpd:
thread = threading.Thread(target=httpd.serve_forever, daemon=True)
thread.start()
httpd.ready_event.wait()
# Doing my work ...
httpd.shutdown()