В докладе PyCon 2015 был представлен следующий код для сервера Фибоначчи на основе сопрограммы. future_monitor()
был добавлен, чтобы сервер работал (иначе он бы зависал). Но зачем нужен Future_monitor()? почему обратного вызова future_done()
(без записи в пару сокетов) недостаточно? Зачем нужна пара сокетов?
from socket import *
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
from select import select
pool = Pool(4)
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
conn, addr = sock.accept() # blocking
print("connection", addr)
tasks.append(fib_handler(conn))
def fib_handler(conn):
while True:
yield 'recv', conn
req = conn.recv(100) # blocking
if not req:
break
n = int(req)
future = pool.submit(fib, n)
yield 'future', future
result = future.result() # blocking
resp = str(result).encode('ascii') + b'\n'
yield 'send', conn
conn.send(resp) # blocking
print('closed')
tasks = deque()
recv_wait = {}
send_wait = {}
future_wait = {}
future_notify, future_event = socketpair()
def future_done(future):
tasks.append(future_wait.pop(future))
future_notify.send(b'x')
def future_monitor():
while True:
yield 'recv', future_event
future_event.recv(100)
tasks.append(future_monitor())
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# no active task to run wait for IO
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
why, what = next(task)
if why == 'recv':
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
elif why == 'future':
future_wait[what] = task
what.add_done_callback(future_done)
else:
raise RuntimeError("We don't know what to do with :", why)
except StopIteration:
print('task done')
if __name__ == "__main__":
tasks.append(fib_server(('localhost', 25000)))
run()
[1] Видео разговора: https://thewikihow.com/video_MCs5OvhV9S4?t=0
Попробовал удалить пару сокетов, и сервер перестал отвечать на запросы.
Если вы находитесь на платформе, где socketpair
по умолчанию создает сокеты AF_UNIX (например, Linux, macOS), и нет получателя, потребляющего отправленные сообщения, поток в конечном итоге заблокируется, когда буфер ОС, используемый для передачи данных между отправляющим и принимающим сокетами потока, заполнится. вверх: https://unix.stackexchange.com/questions/283323/do-unix-domain-sockets-overflow
Хорошо, это объясняет, почему future_monitor()
нужен для разблокировки записывающего сокета, но не объясняет, почему нам вообще нужна пара сокетов. Другими словами, почему бы не работать def Future_done(future): Tasks.append(future_wait.pop(future))?
Ключевая часть объяснения находится по адресу youtu.be/MCs5OvhV9S4?t=2063 в докладе: живая демонстрация не использует обычный основной цикл asyncio, поэтому должна быть вспомогательная задача, которая пробуждает основной цикл. встать в подходящее время. stackoverflow.com/a/78694993/597742 этот аспект рассматривается более подробно.
Я думаю, что цель future_monitor
— помочь перевести логику обратного вызова одной части кода в логику сокета, которую основной цикл в run
использует для ожидания активности.
Вызов select(recv_wait, send_wait, [])
— основной способ блокировки программы, когда ей нечего сразу делать. Вопрос в том, как заставить основной поток просыпаться, когда происходит что-то, не связанное с сокетом?
В этом случае вычисления Фибоначчи выполняются с помощью ProcessPoolExecutor
, который выполняет обратный вызов после завершения расчета. Но этот обратный вызов не разбудит основной поток, а сопрограмма, которая будет использовать результат вычисления, не будет проверять его, пока основной поток не выполнит итерацию.
Вот где взаимодействуют обратный вызов future_done
и сопрограмма future_monitor
. Обратный вызов отправляет немного бессмысленных данных (один символ 'x'
) через локальный сокет. Вызов select
, который блокируется в основном потоке, проснется, когда будет отправлено это сообщение! Сопрограмма просто получает 'x'
и перепланирует себя в ожидании дальнейших событий сокета.
Когда вы удалили сопрограмму future_notify
, из локального сокета ничего не будет прочитано. В конце концов буфер сокета заполнится, и запись заблокируется (или, возможно, вызовет исключения). В любом случае, это уже не сработает.
Это имеет смысл, я добавил таймаут к select()
, и теперь сервер может обслуживать QPS ~1/timeout без взлома future_monitor()
. Спасибо!
В общем, для того, чтобы прогрессировать, необходимо опросить будущее. В данном случае это эмулируется парой сокетов, и Future Done отправляет сообщение в пару сокетов конца уведомления.
future_monitor
— это задача, которая отправляется и постоянно опрашивает сообщение от конца события и обрабатывается функцией запуска.