Я пытаюсь реализовать сервер grpc, который возвращает поток, используя Tonic в Rust. Допустим, у нас есть такой сервис:
rpc FooBar(Input) returns (stream Output) {}
}
Я генерирую код шаблона с помощью prost-build. Следуя примерам на сайте, я смог реализовать сервис:
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
#[derive(Default)]
pub struct FooServer;
#[async_trait]
impl FooService for FooServer {
type FooBarStream = ReceiverStream<Result<Output, Status>>;
async fn foo_bar(
&self,
request: Request<Input>,
) -> std::result::Result<Response<Self::FooBarStream>, Status> {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
for ... {
tx.send(Ok(...)).await;
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
Это работает, но я думаю, что это немного некрасиво. Узнав об async-stream, я подумал переписать его вот так, так как он менее шаблонный и его легче читать. Возможно, это также более эффективно, поскольку не требуется IPC, а заполняет поток напрямую.
use async_stream::{stream, AsyncStream};
...
#[async_trait]
impl FooService for FooServer {
type FooBarStream = AsyncStream?????
async fn foo_bar(
&self,
request: Request<Input>,
) -> std::result::Result<Response<Self::FooBarStream>, Status> {
let stream = stream! {
for ... {
yield ...;
}
});
Ok(stream)
}
}
Проблема в том, что я не могу определить тип потока. Я попробовал AsyncStream<Result<Output, Status>>
, но это не сработало, потому что AsyncStream
имеет 2 параметра шаблона, второй из которых — это функция, генерирующая результат, которая в данном случае является безымянной функцией.
Есть ли способ заставить это работать?
Поскольку полный AsyncStream
является безымянным типом и поскольку синтаксис ассоциированного-типа-позиции-осуществления-признака пока недоступен (т. е. type FooBarStream = impl Stream<...>;
), вам придется сделать следующее лучшее; используйте объект типажа.
Лучший способ сделать это с помощью Box::pin
:
#[async_trait]
impl FooService for FooServer {
type FooBarStream = Pin<Box<dyn Stream<Item = Result<Output, Status>> + Send + 'static>>;
async fn foo_bar(
&self,
request: Request<Input>,
) -> std::result::Result<Response<Self::FooBarStream>, Status> {
let stream = stream! {
for ... {
yield ...;
}
});
Ok(Response::new(Box::pin(stream) as Self::FooBarStream))
}
}
Вы можете увидеть это в разделе Двунаправленная потоковая передача RPC руководства Tonic, хотя они используют try_stream!
, чтобы использовать ?
для ошибок.