Как я могу использовать async_stream с тоником?

Я пытаюсь реализовать сервер grpc, который возвращает поток, используя Tonic в Rust. Допустим, у нас есть такой сервис:

  rpc FooBar(Input) returns (stream Output) {}
}

Я генерирую код шаблона с помощью prost-build. Следуя примерам на сайте, я смог реализовать сервис:

use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

#[derive(Default)]
pub struct FooServer;

#[async_trait]
impl FooService for FooServer {
    type FooBarStream = ReceiverStream<Result<Output, Status>>;

    async fn foo_bar(
        &self,
        request: Request<Input>,
    ) -> std::result::Result<Response<Self::FooBarStream>, Status> {
        let (tx, rx) = mpsc::channel(10);
        tokio::spawn(async move {
            for ... {
                tx.send(Ok(...)).await;
            }
        });

        Ok(Response::new(ReceiverStream::new(rx)))
    }
}

Это работает, но я думаю, что это немного некрасиво. Узнав об async-stream, я подумал переписать его вот так, так как он менее шаблонный и его легче читать. Возможно, это также более эффективно, поскольку не требуется IPC, а заполняет поток напрямую.

use async_stream::{stream, AsyncStream};
...

#[async_trait]
impl FooService for FooServer {
    type FooBarStream = AsyncStream?????

    async fn foo_bar(
        &self,
        request: Request<Input>,
    ) -> std::result::Result<Response<Self::FooBarStream>, Status> {
        let stream = stream! {
            for ... {
                yield ...;
            }
        });

        Ok(stream)
    }
}

Проблема в том, что я не могу определить тип потока. Я попробовал AsyncStream<Result<Output, Status>>, но это не сработало, потому что AsyncStream имеет 2 параметра шаблона, второй из которых — это функция, генерирующая результат, которая в данном случае является безымянной функцией.

Есть ли способ заставить это работать?

Асинхронная передача данных с помощью sendBeacon в JavaScript
Асинхронная передача данных с помощью sendBeacon в JavaScript
В современных веб-приложениях отправка данных из JavaScript на стороне клиента на сервер является распространенной задачей. Одним из популярных...
0
0
63
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку полный AsyncStream является безымянным типом и поскольку синтаксис ассоциированного-типа-позиции-осуществления-признака пока недоступен (т. е. type FooBarStream = impl Stream<...>;), вам придется сделать следующее лучшее; используйте объект типажа.

Лучший способ сделать это с помощью Box::pin:

#[async_trait]
impl FooService for FooServer {
    type FooBarStream = Pin<Box<dyn Stream<Item = Result<Output, Status>> + Send + 'static>>;

    async fn foo_bar(
        &self,
        request: Request<Input>,
    ) -> std::result::Result<Response<Self::FooBarStream>, Status> {
        let stream = stream! {
            for ... {
                yield ...;
            }
        });

        Ok(Response::new(Box::pin(stream) as Self::FooBarStream))
    }
}

Вы можете увидеть это в разделе Двунаправленная потоковая передача RPC руководства Tonic, хотя они используют try_stream!, чтобы использовать ? для ошибок.

Другие вопросы по теме