Почему в демо-версии Дэвида Бизли понадобился хак `future_monitor()`?

В докладе PyCon 2015 был представлен следующий код для сервера Фибоначчи на основе сопрограммы. future_monitor() был добавлен, чтобы сервер работал (иначе он бы зависал). Но зачем нужен Future_monitor()? почему обратного вызова future_done() (без записи в пару сокетов) недостаточно? Зачем нужна пара сокетов?

from socket import *
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
from select import select

pool = Pool(4)

def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        yield 'recv', sock
        conn, addr = sock.accept() # blocking
        print("connection", addr)
        tasks.append(fib_handler(conn))

def fib_handler(conn):
    while True:
        yield 'recv', conn
        req = conn.recv(100)  # blocking
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        yield 'future', future 
        result = future.result()  # blocking
        resp = str(result).encode('ascii') + b'\n'
        yield 'send', conn
        conn.send(resp)  # blocking
    print('closed')

tasks = deque()
recv_wait = {}
send_wait = {}
future_wait = {}

future_notify, future_event = socketpair()

def future_done(future):
    tasks.append(future_wait.pop(future))
    future_notify.send(b'x')

def future_monitor():
    while True:
        yield 'recv', future_event
        future_event.recv(100)

tasks.append(future_monitor())

def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            # no active task to run wait for IO
            can_recv, can_send, _ = select(recv_wait, send_wait, [])
            for s in can_recv:
                tasks.append(recv_wait.pop(s))
            for s in can_send:
                tasks.append(send_wait.pop(s))
        task = tasks.popleft()
        try:
            why, what = next(task)
            if why == 'recv':
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            elif why == 'future':
                future_wait[what] = task
                what.add_done_callback(future_done)
            else:
                raise RuntimeError("We don't know what to do with :", why)
        except StopIteration:
            print('task done')

if __name__ == "__main__":
    tasks.append(fib_server(('localhost', 25000)))
    run()

[1] Видео разговора: https://thewikihow.com/video_MCs5OvhV9S4?t=0

Попробовал удалить пару сокетов, и сервер перестал отвечать на запросы.

В общем, для того, чтобы прогрессировать, необходимо опросить будущее. В данном случае это эмулируется парой сокетов, и Future Done отправляет сообщение в пару сокетов конца уведомления. future_monitor — это задача, которая отправляется и постоянно опрашивает сообщение от конца события и обрабатывается функцией запуска.

metatoaster 02.07.2024 05:28
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
1
53
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Если вы находитесь на платформе, где socketpair по умолчанию создает сокеты AF_UNIX (например, Linux, macOS), и нет получателя, потребляющего отправленные сообщения, поток в конечном итоге заблокируется, когда буфер ОС, используемый для передачи данных между отправляющим и принимающим сокетами потока, заполнится. вверх: https://unix.stackexchange.com/questions/283323/do-unix-domain-sockets-overflow

Хорошо, это объясняет, почему future_monitor() нужен для разблокировки записывающего сокета, но не объясняет, почему нам вообще нужна пара сокетов. Другими словами, почему бы не работать def Future_done(future): Tasks.append(future_wait.pop(future))?

Ben noop 02.07.2024 06:01
Ответ принят как подходящий

Я думаю, что цель future_monitor — помочь перевести логику обратного вызова одной части кода в логику сокета, которую основной цикл в run использует для ожидания активности.

Вызов select(recv_wait, send_wait, []) — основной способ блокировки программы, когда ей нечего сразу делать. Вопрос в том, как заставить основной поток просыпаться, когда происходит что-то, не связанное с сокетом?

В этом случае вычисления Фибоначчи выполняются с помощью ProcessPoolExecutor, который выполняет обратный вызов после завершения расчета. Но этот обратный вызов не разбудит основной поток, а сопрограмма, которая будет использовать результат вычисления, не будет проверять его, пока основной поток не выполнит итерацию.

Вот где взаимодействуют обратный вызов future_done и сопрограмма future_monitor. Обратный вызов отправляет немного бессмысленных данных (один символ 'x') через локальный сокет. Вызов select, который блокируется в основном потоке, проснется, когда будет отправлено это сообщение! Сопрограмма просто получает 'x' и перепланирует себя в ожидании дальнейших событий сокета.

Когда вы удалили сопрограмму future_notify, из локального сокета ничего не будет прочитано. В конце концов буфер сокета заполнится, и запись заблокируется (или, возможно, вызовет исключения). В любом случае, это уже не сработает.

Это имеет смысл, я добавил таймаут к select(), и теперь сервер может обслуживать QPS ~1/timeout без взлома future_monitor(). Спасибо!

Ben noop 02.07.2024 08:58

Другие вопросы по теме