Socketio Server случайным образом задержал выброс

Сервер, использующий python-socketio socketio.AsyncServer для обработки очень простых операций ввода-вывода. Все работает:

  1. Клиент отправляет сообщение для проверки статуса
  2. Сервер возвращает статус

Проблема asyncio.run(sio.emit('status', repr(cb))), по-видимому, достигает клиента не сразу, а со случайной задержкой (от 0,5 до 5 секунд). Странно то, что новый asyncio.run(...) вызовет выполнение предыдущей «задачи». В этом случае сложение нескольких asyncio.run(sio.emit(...)) приведет к выполнению всех задач, кроме самой последней.

Ниже приведен фрагмент кода на стороне сервера:

def callback_status(cb):
    print("Returning status: ", repr(cb)) #this gets executed just fine but the following is randomly delayed
    asyncio.run(sio.emit('status', repr(cb)))

@sio.on('message')
async def get_status(sid, message):
    get_some_status(callback_status) #not an async function

sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)

if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0')
    

Я убедился, что это не относится к стороне клиента, поскольку я использовал другой метод для настройки сервера, и сообщения о состоянии, после отправки, сразу же принимаются на стороне клиента. Причина, по которой я использую socketio.AsyncServer, заключается в том, что клиент может использовать соединение WebSocket (в отличие от соединения http) для постоянной двунаправленной связи с меньшими накладными расходами.

«Странно то, что новый asyncio.run(...) вызовет выполнение предыдущей «задачи». В этом случае стек нескольких asyncio.run(sio.emit(…)) приведет к тому, что все задачи будут быть казненным, кроме самого последнего». Не могли бы вы объяснить это, пожалуйста?

Paul Cornelius 28.12.2022 03:48

Например, если get_status вызывается неоднократно, то asyncio.run(sio.emit('status', repr(cb))) «очередь» задач очищается намного быстрее. Как будто знает, что приходят новые задачи и нужно расчистить «очередь». Можно представить себе список, в котором при поступлении нового элемента будет выталкиваться верхний элемент, или, в этом случае, будет отправлено сообщение. Я не могу представить фактическую реализацию, специально ожидающую элементов, чтобы затем выполнить старые элементы. Обратите внимание, однако, что после всех всплывающих окон последняя задача все равно останется позади и снова будет задержана на несколько секунд.

Hedge 28.12.2022 07:42
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
58
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы не можете складывать asyncio.run() вызовы, к сожалению, это не работает.

Что может сработать, так это заставить вашу функцию обратного вызова синхронизации запланировать асинхронный выброс позже. Что-то вроде этого:

def callback_status(cb):
    print("Returning status: ", repr(cb))
    asyncio.run_coroutine_threadsafe(sio.emit('status', repr(cb)), loop)

Единственное, что вам нужно добавить к этому, это заранее установить loop правильный цикл asyncio. Вы можете получить цикл, вызвав asyncio.get_event_loop() из потока asyncio. Это выражение, скорее всего, не будет работать, если оно используется в функции callback_status(), потому что я предполагаю, что ваш обратный вызов вызывается в контексте другого потока, а не того, в котором запущено асинхронное приложение.

Ваша переменная loop равна None. Вы читали мои заметки о том, как установить переменную цикла?

Miguel Grinberg 30.12.2022 21:10

Прошу прощения за мой ответ, который был сделан без достаточного тестирования/исследования (поэтому я его удалил). Ваше предложение отлично работает. С другой стороны, я фактически переписал все в простом WebSocket, и вызов asyncio.run(_websocket.send_str(repr(cb))) работает быстро и без задержек. Хотя мне интересно, что я мог упустить?

Hedge 31.12.2022 08:15

Я не могу комментировать код, который вы не показываете. Складывать вызовы asyncio.run в стек — ужасная идея, даже если иногда это срабатывает, поэтому я бы не советовал вам это делать.

Miguel Grinberg 31.12.2022 20:36

Другие вопросы по теме