Я хочу отправить два разных сообщения на сервер веб-сокетов, но с разными временными интервалами. Например:
async def send_first_message(websocket):
while True:
await websocket.send("FIRST MESSAGE")
response = await websocket.recv()
await asyncio.sleep(2)
async def send_second_message():
while True:
async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
asyncio.create_task(send_first_message(websocket))
while True:
await websocket.send("SECOND MESSAGE")
response = await websocket.recv()
await asyncio.sleep(5)
asyncio.run(send_second_message())
Если я запускаю такой код, я получаю:
«RuntimeError: невозможно вызвать recv, пока другая сопрограмма уже ожидает следующего сообщения»
Если я закомментирую один из «await websocket.recv()», он отлично работает в течение нескольких секунд, а затем выдает:
"RuntimeError: нет ни полученного, ни отправленного близкого кадра"
Существует некоторое несоответствие между тем, что вы пытаетесь сделать в задачах (синхронное взаимодействие запроса и ответа), и тем, что протокол и библиотека ожидают от вас (асинхронные сообщения).
При написании асинхронного кода вам нужно смотреть на то, что библиотека/протокол/служба ожидает быть атомарной операцией, которая может происходить асинхронно со всем остальным, и что вы хотите, чтобы это была синхронная серия операций. Затем вам нужно найти примитив в библиотеке, который будет поддерживать это. В случае веб-сокетов атомарная операция — это сообщение, отправляемое в любом направлении. Таким образом, вы не можете ожидать, что веб-сокеты синхронизируют поток по двум сообщениям.
Или, другими словами, вы ожидаете синхронных ответов на каждое отправленное сообщение, но веб-сокеты не предназначены для обработки чередующихся синхронных запросов. Вы отправили сообщение на сервер веб-сокетов и хотите получить ответ на это сообщение. Но вы также отправили другое сообщение в тот же веб-сокет и хотите получить ответ и на него. Ваша клиентская библиотека веб-сокетов не может различать сообщение, предназначенное для первого запроса, и сообщение, предназначенное для второго запроса (поскольку на уровне протокола веб-сокета это бессмысленная концепция, поэтому библиотека обеспечивает это, ограничивая операции recv
над websocket, который может блокировать один).
Так ...
Вариант 1 — несколько задач на отдельных сокетах
Из-за того, что библиотека ограничивает веб-сокет одной блокировкой recv
, примитивом в протоколе, отвечающим требованию, является сам веб-сокет. Если это отдельные запросы, на которые вам нужны отдельные ответы на блокировку (поэтому продолжайте запрашивать задачу только после того, как эти ответы будут доступны), вы можете иметь отдельные подключения к веб-сокетам и блокировать ответ в каждом из них.
client1.py
async def send_first_message():
async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
while True:
await websocket.send("FIRST MESSAGE")
response = await websocket.recv()
print(response)
await asyncio.sleep(2)
async def send_second_message():
async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
while True:
await websocket.send("SECOND MESSAGE")
response = await websocket.recv()
print(response)
await asyncio.sleep(5)
async def main():
asyncio.create_task(send_first_message())
asyncio.create_task(send_second_message())
await asyncio.Future()
asyncio.run(main())
Однако вариант 1 на самом деле не является веб-сокетом или асинхронным способом.
Вариант 2 - использовать асинхронность
Чтобы сделать это на одном веб-сокете, вам нужно будет получать ответ асинхронно для обеих задач отправки.
Если вас на самом деле не волнует, что функции send_
* получат ответ, вы можете сделать это легко...
client2.py
async def send_first_message(websocket):
while True:
await websocket.send("FIRST MESSAGE")
await asyncio.sleep(2)
async def send_second_message(websocket):
while True:
await websocket.send("SECOND MESSAGE")
await asyncio.sleep(5)
async def receive_message(websocket):
while True:
response = await websocket.recv()
print(response)
async def main():
async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
asyncio.create_task(send_first_message(websocket))
asyncio.create_task(send_second_message(websocket))
asyncio.create_task(receive_message(websocket))
await asyncio.Future()
asyncio.run(main())
Вариант 3
Но что, если вы хотите выстроить ответы на запросы и оставить один веб-сокет? Вам нужен какой-то способ узнать, для какого запроса предназначен тот или иной конкретный ответ. Большинство веб-сервисов, которые нуждаются в таком взаимодействии, заставят вас отправить идентификатор в сообщении на сервер, и он ответит, как только ответ будет готов, используя идентификатор в качестве ссылки.
Существует также способ заставить ваши задачи обработки сообщений блокироваться и ждать ответа с правильным идентификатором, ставя ответы в очередь и периодически проверяя их.
client3.py
unhandled_responses = {}
async def send_first_message(websocket):
while True:
req_id = random.randint(0,65535)
message = json.dumps({'id': req_id, 'message': 'FIRST MESSAGE'})
await websocket.send(message)
response = await block_for_response(req_id)
print(response)
await asyncio.sleep(2)
async def send_second_message(websocket):
while True:
req_id = random.randint(0,65535)
message = json.dumps({'id': req_id, 'message': 'SECOND MESSAGE'})
await websocket.send(message)
response = await block_for_response(req_id)
print(response)
await asyncio.sleep(5)
async def block_for_response(id):
while True:
response = unhandled_responses.pop(id, None)
if response:
return response
await asyncio.sleep(0.1)
async def receive_message(websocket):
while True:
response = json.loads(await websocket.recv())
unhandled_responses[response['id']] = response
async def main():
async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
asyncio.create_task(send_first_message(websocket))
asyncio.create_task(send_second_message(websocket))
asyncio.create_task(receive_message(websocket))
await asyncio.Future()
asyncio.run(main())
Для полноты картины серверный код, с которым разговаривали клиенты в моих тестах.
сервер.py
import asyncio
import websockets
async def server_endpoint(websocket):
try:
while True:
recv_msg = await websocket.recv()
response = recv_msg
await websocket.send(response)
except Exception as ex:
print(str(ex))
async def main():
async with websockets.serve(server_endpoint, "localhost", 8765):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())