Моя цель - потреблять аудиопоток. Логически, это моя цель:
FastAPI
)Чтобы решить эту проблему, у меня было довольно много идей, но в конечном итоге я пытался соединить WebSocket A
с WebSocket B
. Моя попытка до сих пор включает класс ConnectionManager
, который содержит Queue.queue
. Фрагменты аудиопотока добавляются в эту очередь, чтобы мы не потребляли напрямую из WebSocket A
.
ConnectionManager
также содержит метод генератора для получения всех значений из очереди.
Моя реализация FastAPI использует websocket A
следующим образом:
@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
chunk = await websocket.receive_bytes()
manager.add_to_buffer(chunk)
except KeyboardInterrupt:
manager.disconnect()
Параллельно с этим приемом я хотел бы иметь задачу, которая соединяла бы наш аудиопоток с WebSocket B
и отправляла полученные значения в WebSocket A
. Аудиопоток можно использовать с помощью вышеупомянутого метода generator
.
Метод генератора необходим из-за того, как WebSocket B использует сообщения, как показано в примерах Rev-ai :
streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
# return through websocket A this value
print(response)
Это одна из самых больших проблем, поскольку нам нужно передавать данные в генератор и получать результаты в режиме реального времени.
Я пытался попытать счастья с asyncio
; насколько я понимаю, можно было бы создать сопрограмму, которая будет работать в фоновом режиме. Я потерпел неудачу с этим, но это звучало многообещающе.
Я думал о том, чтобы запустить это через событие запуска FastAPI
, но у меня возникли проблемы с достижением параллелизма. Я пытался использовать event_loops
, но это выдавало ошибку, связанную с nested event loop
.
FastAPI может быть необязательным, если вы считаете таковым, и в некотором роде WebSocket A. В конце концов, конечной целью является получение аудиопотока через нашу собственную конечную точку API, запуск его через WebSocket Rev.ai, выполнение некоторую дополнительную обработку и отправить результаты обратно.
Ниже приведен простой пример прокси-сервера веб-сокета, где веб-сокет A
и веб-сокет B
являются конечными точками в приложении FastAPI, но веб-сокет B
может находиться где-то еще, просто измените его адрес ws_b_uri
. Для клиента websocket используется библиотека websockets.
Для пересылки данных код A
конечной точки запускает две задачи forward
и reverse
и ожидает их завершения с помощью asyncio.gather(). Передача данных для обоих направлений происходит параллельно.
import asyncio
from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()
ws_b_uri = "ws://localhost:8001/ws_b"
async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await ws_b.send(data)
async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_b.recv()
await ws_a.send_text(data)
print("websocket A sent:", data)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
await ws_a.accept()
async with websockets.connect(ws_b_uri) as ws_b_client:
fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
await ws_b_server.accept()
while True:
data = await ws_b_server.receive_bytes()
print("websocket B server recieved: ", data)
await ws_b_server.send_text('{"response": "value from B server"}')
Учитывая последнее обновление вопроса, проблема в том, что WebSocket B
спрятан за синхронным генератором (на самом деле их два, один на вход, другой на выход) и по сути задача превращается в то, как сделать мост между WebSocket и синхронным генератором. А так как я никогда не работал с библиотекой rev-ai
, я сделал функцию-заглушку stream_client_start
для streamclient.start
, которая принимает генератор (MEDIA_GENERATOR
в оригинале) и возвращает генератор (response_generator
в оригинале).
В этом случае я запускаю обработку генераторов в отдельном потоке через run_in_executor
, а чтобы не изобретать велосипед, для связи использую очередь из библиотеки janus, которая позволяет связать синхронный и асинхронный код через очередь. Соответственно очереди тоже две, одна на A -> B
, другая на B -> A
.
import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue
app = FastAPI()
# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
for chunk in input_gen:
time.sleep(1)
yield f"Get {chunk}"
# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
while True:
yield sync_queue.get()
async def forward(ws_a: WebSocket, queue_b):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await queue_b.put(data)
async def reverse(ws_a: WebSocket, queue_b):
while True:
data = await queue_b.get()
await ws_a.send_text(data)
print("websocket A sent:", data)
def process_b_client(fwd_queue, rev_queue):
response_generator = stream_client_start(queue_to_generator(fwd_queue))
for r in response_generator:
rev_queue.put(r)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
loop = asyncio.get_event_loop()
fwd_queue = janus.Queue()
rev_queue = janus.Queue()
await ws_a.accept()
process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
await asyncio.gather(process_client_task, fwd_task, rev_task)
Я обновил ответ для генераторов, но если будут проблемы с библиотекой rev-ai
, то вряд ли смогу помочь.
@alex_noname Большое спасибо за реализацию моста через веб-сокет, именно то, что мне нужно!
Привет @alex_noname! Ваш ответ очень проницателен, но есть слой, который, боюсь, мне следовало уточнить дальше, который будет интерфейсом, который Rev-ai предоставляет для доступа к WebSocket B, применяя генератор. Тем не менее, спасибо за то, как вы структурировали свой ответ. Я добавил дополнительный бит к вопросу, чтобы прояснить этот момент. Я хотел бы попросить вас внести свой вклад с учетом генератора, и если не будет разработки, я приму этот ответ, поскольку он действительно показывает, как соединить два веб-сокета.