Клиент Python Websocket периодически отправляет сообщения, но с разными временными интервалами

Я хочу отправить два разных сообщения на сервер веб-сокетов, но с разными временными интервалами. Например:

  1. Первое сообщение должно отправляться каждые 2 секунды.
  2. Второе сообщение должно отправляться каждые 5 секунд.
async def send_first_message(websocket):
    while True: 
        await websocket.send("FIRST MESSAGE")
        response = await websocket.recv()
        await asyncio.sleep(2)



async def send_second_message():
    while True: 
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:

            asyncio.create_task(send_first_message(websocket))

            while True:
                await websocket.send("SECOND MESSAGE")
                response = await websocket.recv()
                await asyncio.sleep(5)


asyncio.run(send_second_message())

Если я запускаю такой код, я получаю:

«RuntimeError: невозможно вызвать recv, пока другая сопрограмма уже ожидает следующего сообщения»

Если я закомментирую один из «await websocket.recv()», он отлично работает в течение нескольких секунд, а затем выдает:

"RuntimeError: нет ни полученного, ни отправленного близкого кадра"

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
118
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Существует некоторое несоответствие между тем, что вы пытаетесь сделать в задачах (синхронное взаимодействие запроса и ответа), и тем, что протокол и библиотека ожидают от вас (асинхронные сообщения).

При написании асинхронного кода вам нужно смотреть на то, что библиотека/протокол/служба ожидает быть атомарной операцией, которая может происходить асинхронно со всем остальным, и что вы хотите, чтобы это была синхронная серия операций. Затем вам нужно найти примитив в библиотеке, который будет поддерживать это. В случае веб-сокетов атомарная операция — это сообщение, отправляемое в любом направлении. Таким образом, вы не можете ожидать, что веб-сокеты синхронизируют поток по двум сообщениям.

Или, другими словами, вы ожидаете синхронных ответов на каждое отправленное сообщение, но веб-сокеты не предназначены для обработки чередующихся синхронных запросов. Вы отправили сообщение на сервер веб-сокетов и хотите получить ответ на это сообщение. Но вы также отправили другое сообщение в тот же веб-сокет и хотите получить ответ и на него. Ваша клиентская библиотека веб-сокетов не может различать сообщение, предназначенное для первого запроса, и сообщение, предназначенное для второго запроса (поскольку на уровне протокола веб-сокета это бессмысленная концепция, поэтому библиотека обеспечивает это, ограничивая операции recv над websocket, который может блокировать один).

Так ...

Вариант 1 — несколько задач на отдельных сокетах

Из-за того, что библиотека ограничивает веб-сокет одной блокировкой recv, примитивом в протоколе, отвечающим требованию, является сам веб-сокет. Если это отдельные запросы, на которые вам нужны отдельные ответы на блокировку (поэтому продолжайте запрашивать задачу только после того, как эти ответы будут доступны), вы можете иметь отдельные подключения к веб-сокетам и блокировать ответ в каждом из них.

client1.py

async def send_first_message():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        while True: 
            await websocket.send("FIRST MESSAGE")
            response = await websocket.recv()
            print(response)
            await asyncio.sleep(2)

async def send_second_message():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        while True: 
            await websocket.send("SECOND MESSAGE")
            response = await websocket.recv()
            print(response)
            await asyncio.sleep(5)

async def main():
    asyncio.create_task(send_first_message())
    asyncio.create_task(send_second_message())
    await asyncio.Future()

asyncio.run(main())

Однако вариант 1 на самом деле не является веб-сокетом или асинхронным способом.

Вариант 2 - использовать асинхронность

Чтобы сделать это на одном веб-сокете, вам нужно будет получать ответ асинхронно для обеих задач отправки.

Если вас на самом деле не волнует, что функции send_* получат ответ, вы можете сделать это легко...

client2.py

async def send_first_message(websocket):
    while True: 
        await websocket.send("FIRST MESSAGE")
        await asyncio.sleep(2)

async def send_second_message(websocket):
    while True:
        await websocket.send("SECOND MESSAGE")
        await asyncio.sleep(5)
        
async def receive_message(websocket):
    while True:
        response = await websocket.recv()
        print(response)

async def main():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        asyncio.create_task(send_first_message(websocket))
        asyncio.create_task(send_second_message(websocket))
        asyncio.create_task(receive_message(websocket))
        await asyncio.Future()

asyncio.run(main())

Вариант 3

Но что, если вы хотите выстроить ответы на запросы и оставить один веб-сокет? Вам нужен какой-то способ узнать, для какого запроса предназначен тот или иной конкретный ответ. Большинство веб-сервисов, которые нуждаются в таком взаимодействии, заставят вас отправить идентификатор в сообщении на сервер, и он ответит, как только ответ будет готов, используя идентификатор в качестве ссылки.

Существует также способ заставить ваши задачи обработки сообщений блокироваться и ждать ответа с правильным идентификатором, ставя ответы в очередь и периодически проверяя их.

client3.py

unhandled_responses = {}

async def send_first_message(websocket):
    while True:
        req_id = random.randint(0,65535)
        message = json.dumps({'id': req_id, 'message': 'FIRST MESSAGE'})
        await websocket.send(message)
        response = await block_for_response(req_id)
        print(response)
        await asyncio.sleep(2)

async def send_second_message(websocket):
    while True:
        req_id = random.randint(0,65535)
        message = json.dumps({'id': req_id, 'message': 'SECOND MESSAGE'})
        await websocket.send(message)
        response = await block_for_response(req_id)
        print(response)
        await asyncio.sleep(5)

async def block_for_response(id):
    while True:
        response = unhandled_responses.pop(id, None)
        if response:
            return response
        await asyncio.sleep(0.1)

async def receive_message(websocket):
    while True:
        response = json.loads(await websocket.recv())
        unhandled_responses[response['id']] = response

async def main():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        asyncio.create_task(send_first_message(websocket))
        asyncio.create_task(send_second_message(websocket))
        asyncio.create_task(receive_message(websocket))
        await asyncio.Future()

asyncio.run(main())

Для полноты картины серверный код, с которым разговаривали клиенты в моих тестах.

сервер.py

import asyncio
import websockets

async def server_endpoint(websocket):
    try:
        while True:
            recv_msg = await websocket.recv()
            response = recv_msg
            await websocket.send(response)
    except Exception as ex:
        print(str(ex))

async def main():
    async with websockets.serve(server_endpoint, "localhost", 8765):
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

Другие вопросы по теме