Сервер, использующий python-socketio socketio.AsyncServer для обработки очень простых операций ввода-вывода. Все работает:
Проблема
asyncio.run(sio.emit('status', repr(cb)))
, по-видимому, достигает клиента не сразу, а со случайной задержкой (от 0,5 до 5 секунд). Странно то, что новый asyncio.run(...)
вызовет выполнение предыдущей «задачи». В этом случае сложение нескольких asyncio.run(sio.emit(...))
приведет к выполнению всех задач, кроме самой последней.
Ниже приведен фрагмент кода на стороне сервера:
def callback_status(cb):
print("Returning status: ", repr(cb)) #this gets executed just fine but the following is randomly delayed
asyncio.run(sio.emit('status', repr(cb)))
@sio.on('message')
async def get_status(sid, message):
get_some_status(callback_status) #not an async function
sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0')
Я убедился, что это не относится к стороне клиента, поскольку я использовал другой метод для настройки сервера, и сообщения о состоянии, после отправки, сразу же принимаются на стороне клиента. Причина, по которой я использую socketio.AsyncServer, заключается в том, что клиент может использовать соединение WebSocket (в отличие от соединения http) для постоянной двунаправленной связи с меньшими накладными расходами.
Например, если get_status
вызывается неоднократно, то asyncio.run(sio.emit('status', repr(cb)))
«очередь» задач очищается намного быстрее. Как будто знает, что приходят новые задачи и нужно расчистить «очередь». Можно представить себе список, в котором при поступлении нового элемента будет выталкиваться верхний элемент, или, в этом случае, будет отправлено сообщение. Я не могу представить фактическую реализацию, специально ожидающую элементов, чтобы затем выполнить старые элементы. Обратите внимание, однако, что после всех всплывающих окон последняя задача все равно останется позади и снова будет задержана на несколько секунд.
Вы не можете складывать asyncio.run()
вызовы, к сожалению, это не работает.
Что может сработать, так это заставить вашу функцию обратного вызова синхронизации запланировать асинхронный выброс позже. Что-то вроде этого:
def callback_status(cb):
print("Returning status: ", repr(cb))
asyncio.run_coroutine_threadsafe(sio.emit('status', repr(cb)), loop)
Единственное, что вам нужно добавить к этому, это заранее установить loop
правильный цикл asyncio. Вы можете получить цикл, вызвав asyncio.get_event_loop()
из потока asyncio. Это выражение, скорее всего, не будет работать, если оно используется в функции callback_status()
, потому что я предполагаю, что ваш обратный вызов вызывается в контексте другого потока, а не того, в котором запущено асинхронное приложение.
Ваша переменная loop
равна None
. Вы читали мои заметки о том, как установить переменную цикла?
Прошу прощения за мой ответ, который был сделан без достаточного тестирования/исследования (поэтому я его удалил). Ваше предложение отлично работает. С другой стороны, я фактически переписал все в простом WebSocket
, и вызов asyncio.run(_websocket.send_str(repr(cb)))
работает быстро и без задержек. Хотя мне интересно, что я мог упустить?
Я не могу комментировать код, который вы не показываете. Складывать вызовы asyncio.run в стек — ужасная идея, даже если иногда это срабатывает, поэтому я бы не советовал вам это делать.
«Странно то, что новый asyncio.run(...) вызовет выполнение предыдущей «задачи». В этом случае стек нескольких asyncio.run(sio.emit(…)) приведет к тому, что все задачи будут быть казненным, кроме самого последнего». Не могли бы вы объяснить это, пожалуйста?