Прослушивать несколько сокетов с помощью веб-сокетов и asyncio

Я пытаюсь создать сценарий на python, который прослушивает несколько сокетов с использованием веб-сокетов и asyncio, проблема в том, что независимо от того, что я делаю, он слушает только первый сокет, который я вызываю. Я думаю, что это бесконечный цикл, как я могу это решить? используя потоки для каждого сокета?

  async def start_socket(self, event):
    payload = json.dumps(event)
    loop = asyncio.get_event_loop()

    self.tasks.append(loop.create_task(
        self.subscribe(event)))

    # this should not block the rest of the code
    await asyncio.gather(*tasks)


  def test(self):
    # I want to be able to add corotines at a different time
    self.start_socket(event1)
    # some code
    self.start_socket(event2)
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
3
0
3 931
3

Ответы 3

Ваш код кажется неполным, но то, что вы показали, имеет две проблемы. Во-первых, run_until_complete принимает объект сопрограммы (или другой вид будущего), а не функцию сопрограммы. Так и должно быть:

# note parentheses after your_async_function()
asyncio.get_event_loop().run_until_complete(your_async_function())

the problem is that no matter what I do it only listen to the first socket I call. I think its the infinite loop, what are my option to solve this? using threads for each sockets?

Бесконечный цикл - не проблема, asyncio разработан для поддержки таких «бесконечных циклов». Проблема в том, что вы пытаетесь делать все в одной сопрограмме, тогда как вы должны создавать одну сопрограмму для каждого веб-сокета. Это не проблема, поскольку сопрограммы очень легкие.

Например (не проверено):

async def subscribe_all(self, payload):
    loop = asyncio.get_event_loop()
    # create a task for each URL
    for url in url_list:
        tasks.append(loop.create_task(self.subscribe_one(url, payload)))
    # run all tasks in parallel
    await asyncio.gather(*tasks)

async def subsribe_one(self, url, payload):
    async with websockets.connect(url) as websocket:
        await websocket.send(payload)
        while True:
            msg = await websocket.recv()
            print(msg)

await asyncio.gather (* tasks), дает мне RuntimeWarning: сопрограмма 'subscribe_all' никогда не ожидалась (функция async не выполнялась) .... она работает, если я использую run_until_complete (gather (..)), но она блокирует поток и код после того, как он не выполняется

joseRo 17.04.2018 13:02

@joseRo Трудно сказать, что происходит без остального кода. Вероятно, вам следует дождаться subscribe_all от другой сопрограммы ...

user4815162342 17.04.2018 13:41

@joseRo А кто звонит test()?

user4815162342 17.04.2018 19:25

другой файл, просто импортируйте и вызовите его.

joseRo 18.04.2018 07:09

@joseRo Я имею в виду в контексте asyncio - вызывающий абонент - это async def или обычная функция? Цикл событий уже запущен или кто-нибудь запустит его позже?

user4815162342 18.04.2018 07:26

обычная функция, цикл событий запускается в start_socket func (код, который я прилагаю)

joseRo 18.04.2018 13:19

@joseRo start_socket запускает цикл событий нет, он просто создает задачу. Под «запуском цикла событий» я подразумеваю, кто вызывает run_until_complete или run_forever, и является ли цикл событий уже запущенным к моменту вызова test().

user4815162342 18.04.2018 13:22

это то, что я сделал в конечном итоге, таким образом, он не блокировал основной поток, и все подписки работали параллельно.

def subscribe(self, payload):
    ws = websocket.WebSocket(sslopt = {"cert_reqs": ssl.CERT_NONE})
    ws.connect(url)
    ws.send(payload)
    while True:
        result = ws.recv()
        print("Received '%s'" % result)

    def start_thread(self, loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def start_socket(self, **kwargs):
    worker_loop = asyncio.new_event_loop()
    worker = Thread(target=self.start_thread, args=(worker_loop,))
    worker.start()

    worker_loop.call_soon_threadsafe(self.subscribe, payload)


def listen(self):
    self.start_socket(payload1)
    # code
    self.start_socket(payload2)
    # code
    self.start_socket(payload3)

Один из способов эффективного прослушивания нескольких подключений через веб-серверы с сервера веб-сокетов - это вести список подключенных клиентов и, по сути, управлять несколькими разговорами параллельно.

Например. Простой сервер, который отправляет случайный # каждому подключенному клиенту каждые несколько секунд:

import os
import asyncio
import websockets
import random 

websocket_clients = set()

async def handle_socket_connection(websocket, path):
    """Handles the whole lifecycle of each client's websocket connection."""
    websocket_clients.add(websocket)
    print(f'New connection from: {websocket.remote_address} ({len(websocket_clients)} total)')
    try:
        # This loop will keep listening on the socket until its closed. 
        async for raw_message in websocket:
            print(f'Got: [{raw_message}] from socket [{id(websocket)}]')
    except websockets.exceptions.ConnectionClosedError as cce:
        pass
    finally:
        print(f'Disconnected from socket [{id(websocket)}]...')
        websocket_clients.remove(websocket)

async def broadcast_random_number(loop):
    """Keeps sending a random # to each connected websocket client"""
    while True:
        for c in websocket_clients:
            num = str(random.randint(10, 99))
            print(f'Sending [{num}] to socket [{id(c)}]')
            await c.send(num)
        await asyncio.sleep(2)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        socket_server = websockets.serve(handle_socket_connection, 'localhost', 6789)
        print(f'Started socket server: {socket_server} ...')
        loop.run_until_complete(socket_server)
        loop.run_until_complete(broadcast_random_number(loop))
        loop.run_forever()
    finally:
        loop.close()
        print(f"Successfully shutdown [{loop}].")

Простой клиент, который подключается к серверу и прослушивает числа:

import asyncio
import random
import websockets

async def handle_message():
    uri = "ws://localhost:6789"
    async with websockets.connect(uri) as websocket:
        msg = 'Please send me a number...'
        print(f'Sending [{msg}] to [{websocket}]')
        await websocket.send(msg)
        while True:
            got_back = await websocket.recv()
            print(f"Got: {got_back}")

asyncio.get_event_loop().run_until_complete(handle_message())

Смешивание потоков и asyncio - больше проблем, чем его ценность, и у вас все еще есть код, который блокирует самые расточительные шаги, такие как сетевой ввод-вывод (что является существенным преимуществом использования asyncio).

Вам необходимо запускать каждый сопрограмма асинхронно в цикл событий, вызывать любые блокирующие вызовы с await и определять каждый метод, который взаимодействует с любыми взаимодействиями ожидаемый с async.

См. Рабочий пример: https://github.com/adnantium/websocket_client_server

Другие вопросы по теме