Мост веб-сокетов для аудиопотока в FastAPI

Цель

Моя цель - потреблять аудиопоток. Логически, это моя цель:

  1. Аудиопоток поступает через WebSocket A (конечная точка FastAPI)
  2. Аудиопоток подключается к другому WebSocket, B, который возвращает JSON (Rev-ai WebSocket)
  3. Результаты Json отправляются обратно через WebSocket A в режиме реального времени. Таким образом, пока аудиопоток все еще поступает.

Возможное решение

Чтобы решить эту проблему, у меня было довольно много идей, но в конечном итоге я пытался соединить WebSocket A с WebSocket B. Моя попытка до сих пор включает класс ConnectionManager, который содержит Queue.queue. Фрагменты аудиопотока добавляются в эту очередь, чтобы мы не потребляли напрямую из WebSocket A.

ConnectionManager также содержит метод генератора для получения всех значений из очереди.

Моя реализация FastAPI использует websocket A следующим образом:

@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            chunk = await websocket.receive_bytes()
            manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
        manager.disconnect()

Параллельно с этим приемом я хотел бы иметь задачу, которая соединяла бы наш аудиопоток с WebSocket B и отправляла полученные значения в WebSocket A. Аудиопоток можно использовать с помощью вышеупомянутого метода generator.

Метод генератора необходим из-за того, как WebSocket B использует сообщения, как показано в примерах Rev-ai :

streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
    # return through websocket A this value
    print(response)

Это одна из самых больших проблем, поскольку нам нужно передавать данные в генератор и получать результаты в режиме реального времени.

Последние попытки

Я пытался попытать счастья с asyncio; насколько я понимаю, можно было бы создать сопрограмму, которая будет работать в фоновом режиме. Я потерпел неудачу с этим, но это звучало многообещающе.

Я думал о том, чтобы запустить это через событие запуска FastAPI, но у меня возникли проблемы с достижением параллелизма. Я пытался использовать event_loops, но это выдавало ошибку, связанную с nested event loop.

Предостережение

FastAPI может быть необязательным, если вы считаете таковым, и в некотором роде WebSocket A. В конце концов, конечной целью является получение аудиопотока через нашу собственную конечную точку API, запуск его через WebSocket Rev.ai, выполнение некоторую дополнительную обработку и отправить результаты обратно.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python.
Некоторые методы, о которых вы не знали, что они существуют в Python.
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
7
0
5 160
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мост для веб-сокета <-> веб-сокет

Ниже приведен простой пример прокси-сервера веб-сокета, где веб-сокет A и веб-сокет B являются конечными точками в приложении FastAPI, но веб-сокет B может находиться где-то еще, просто измените его адрес ws_b_uri. Для клиента websocket используется библиотека websockets.

Для пересылки данных код A конечной точки запускает две задачи forward и reverse и ожидает их завершения с помощью asyncio.gather(). Передача данных для обоих направлений происходит параллельно.

import asyncio

from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()

ws_b_uri = "ws://localhost:8001/ws_b"


async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await ws_b.send(data)


async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_b.recv()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    await ws_a.accept()
    async with websockets.connect(ws_b_uri) as ws_b_client:
        fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
        rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
        await asyncio.gather(fwd_task, rev_task)


@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
    await ws_b_server.accept()
    while True:
        data = await ws_b_server.receive_bytes()
        print("websocket B server recieved: ", data)
        await ws_b_server.send_text('{"response": "value from B server"}')

Обновление (веб-сокет моста <-> генератор синхронизации)

Учитывая последнее обновление вопроса, проблема в том, что WebSocket B спрятан за синхронным генератором (на самом деле их два, один на вход, другой на выход) и по сути задача превращается в то, как сделать мост между WebSocket и синхронным генератором. А так как я никогда не работал с библиотекой rev-ai, я сделал функцию-заглушку stream_client_start для streamclient.start, которая принимает генератор (MEDIA_GENERATOR в оригинале) и возвращает генератор (response_generator в оригинале).

В этом случае я запускаю обработку генераторов в отдельном потоке через run_in_executor, а чтобы не изобретать велосипед, для связи использую очередь из библиотеки janus, которая позволяет связать синхронный и асинхронный код через очередь. Соответственно очереди тоже две, одна на A -> B, другая на B -> A.

import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue

app = FastAPI()


# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
    for chunk in input_gen:
        time.sleep(1)
        yield f"Get {chunk}"


# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
    while True:
        yield sync_queue.get()


async def forward(ws_a: WebSocket, queue_b):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:", data)
        await queue_b.put(data)


async def reverse(ws_a: WebSocket, queue_b):
    while True:
        data = await queue_b.get()
        await ws_a.send_text(data)
        print("websocket A sent:", data)


def process_b_client(fwd_queue, rev_queue):
    response_generator = stream_client_start(queue_to_generator(fwd_queue))
    for r in response_generator:
        rev_queue.put(r)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    loop = asyncio.get_event_loop()
    fwd_queue = janus.Queue()
    rev_queue = janus.Queue()
    await ws_a.accept()

    process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
    fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
    rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
    await asyncio.gather(process_client_task, fwd_task, rev_task)

Привет @alex_noname! Ваш ответ очень проницателен, но есть слой, который, боюсь, мне следовало уточнить дальше, который будет интерфейсом, который Rev-ai предоставляет для доступа к WebSocket B, применяя генератор. Тем не менее, спасибо за то, как вы структурировали свой ответ. Я добавил дополнительный бит к вопросу, чтобы прояснить этот момент. Я хотел бы попросить вас внести свой вклад с учетом генератора, и если не будет разработки, я приму этот ответ, поскольку он действительно показывает, как соединить два веб-сокета.

rmssoares 05.01.2021 11:06

Я обновил ответ для генераторов, но если будут проблемы с библиотекой rev-ai, то вряд ли смогу помочь.

alex_noname 05.01.2021 13:45

@alex_noname Большое спасибо за реализацию моста через веб-сокет, именно то, что мне нужно!

elmcrest 21.04.2021 20:30

Другие вопросы по теме