Я пытаюсь использовать ящик reqwest
для потоковой передачи двоичных данных в формате стрелок-IPC из конечной точки REST API, находящейся вне моего контроля. Объект reqwest::Reponse
имеет метод bytes_stream()
, который возвращает тип, реализующий признак Stream<Item = Result<Bytes>>
. Я надеюсь, что это можно каким-то образом прочитать как поток RecordBatch, например, с помощью arrow-ipc::reader::StreamReader
или какого-либо другого реализатора черты arrow_array::RecordBatchReader
. Как лучше всего это сделать?
Альтернативно, является ли асинхронность обязательным требованием? Возможно, вы можете использовать код блокировки.
Возможно ли в принципе использовать стрелку и стрелку2 (и я бы использовал стрелку2 для этой задачи)? Если нет, я мог бы переключиться на использование ответов от request::blocking::Client, и мне было бы интересно решение в этом направлении.
Крейт arrow
в настоящее время не поддерживает асинхронность (хотя по этому поводу есть открытый вопрос ), но альтернатива arrow2
поддерживает, и, к счастью, она также определяет уровень преобразования между своими типами и типами arrow
. Итак, это один из вариантов (код весьма сложен, потому что асинхронная поддержка arrow2
требует AsyncRead
, но у нас есть Stream
, дающий Byte
):
use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use arrow2::datatypes::Schema;
use arrow2::io::ipc::read::stream_async::AsyncStreamReader;
use arrow2::io::ipc::read::StreamMetadata;
use arrow2::io::ipc::IpcSchema;
use arrow_format::ipc::MetadataVersion;
use bytes::Bytes;
use futures::stream::Fuse;
use futures::{AsyncRead, Stream, StreamExt};
use reqwest::Client;
struct StreamAsAsyncRead<St> {
stream: Fuse<St>,
last: Option<Bytes>,
}
impl<St: Stream> StreamAsAsyncRead<St> {
fn new(stream: St) -> Self {
Self {
stream: stream.fuse(),
last: None,
}
}
}
impl<St: Stream<Item = reqwest::Result<Bytes>> + Unpin> AsyncRead for StreamAsAsyncRead<St> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = &mut *self;
let data = match &mut this.last {
Some(data) if !data.is_empty() => data,
last => {
let Some(next_data) = ready!(this.stream.poll_next_unpin(cx)) else {
return Poll::Ready(Ok(0));
};
let next_data = next_data.map_err(|err| io::Error::other(err))?;
last.insert(next_data)
}
};
let fill_len = std::cmp::min(buf.len(), data.len());
buf[..fill_len].copy_from_slice(&data[..fill_len]);
data.advance(fill_len);
Poll::Ready(Ok(fill_len))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client.get("<address>").send().await?;
let data_metadata = StreamMetadata {
schema: Schema::default(),
version: MetadataVersion::V5,
ipc_schema: IpcSchema {
fields: Vec::new(),
is_little_endian: true,
},
};
let mut stream = AsyncStreamReader::new(
StreamAsAsyncRead::new(response.bytes_stream()),
data_metadata,
);
while let Some(item) = stream.next().await {
let item = item?;
for item in item.into_arrays() {
let item = arrow::array::ArrayRef::from(item);
// Do something with `item`.
}
}
Ok(())
}
Груз.томл:
[dependencies]
arrow = "50.0.0"
reqwest = { version = "0.11.24", features = ["stream"] }
tokio = { version = "1.36.0", features = ["full"] }
bytes = "1.5.0"
futures = "0.3.30"
arrow-format = "0.8.1"
arrow2 = { version = "0.18.0", features = [
"io_ipc",
"io_ipc_read_async",
"arrow",
] }
Другой, более простой вариант — отказаться от поддержки асинхронности и просто использовать блокирующий интерфейс reqwest:
use arrow::ipc::reader::StreamReader;
use reqwest::blocking::Client;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let response = client.get("<address>").send()?;
let reader = StreamReader::try_new(response, None)?;
for item in reader {
let item = item?;
// Do something with `item`.
}
Ok(())
}
Крейт
arrow
, похоже, не предоставляет асинхронных возможностей (по этому поводу существует открытый вопрос ), но крейтarrow2
предоставляет. Можете ли вы переключиться на него?