Как я могу прочитать bytes_stream() объекта reqwest::Response с помощью разработчикаarrow_array::RecordBatchReader?

Я пытаюсь использовать ящик reqwest для потоковой передачи двоичных данных в формате стрелок-IPC из конечной точки REST API, находящейся вне моего контроля. Объект reqwest::Reponse имеет метод bytes_stream(), который возвращает тип, реализующий признак Stream<Item = Result<Bytes>>. Я надеюсь, что это можно каким-то образом прочитать как поток RecordBatch, например, с помощью arrow-ipc::reader::StreamReader или какого-либо другого реализатора черты arrow_array::RecordBatchReader. Как лучше всего это сделать?

Крейт arrow, похоже, не предоставляет асинхронных возможностей (по этому поводу существует открытый вопрос ), но крейт arrow2 предоставляет. Можете ли вы переключиться на него?

Chayim Friedman 18.03.2024 19:35

Альтернативно, является ли асинхронность обязательным требованием? Возможно, вы можете использовать код блокировки.

Chayim Friedman 18.03.2024 19:35

Возможно ли в принципе использовать стрелку и стрелку2 (и я бы использовал стрелку2 для этой задачи)? Если нет, я мог бы переключиться на использование ответов от request::blocking::Client, и мне было бы интересно решение в этом направлении.

jwimberley 18.03.2024 19:48
Почему Python в конце концов умрет
Почему Python в конце концов умрет
Последние 20 лет были действительно хорошими для Python. Он прошел путь от "просто языка сценариев" до основного языка, используемого для написания...
1
3
345
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Крейт arrow в настоящее время не поддерживает асинхронность (хотя по этому поводу есть открытый вопрос ), но альтернатива arrow2 поддерживает, и, к счастью, она также определяет уровень преобразования между своими типами и типами arrow. Итак, это один из вариантов (код весьма сложен, потому что асинхронная поддержка arrow2 требует AsyncRead, но у нас есть Stream, дающий Byte):

use std::io;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use arrow2::datatypes::Schema;
use arrow2::io::ipc::read::stream_async::AsyncStreamReader;
use arrow2::io::ipc::read::StreamMetadata;
use arrow2::io::ipc::IpcSchema;
use arrow_format::ipc::MetadataVersion;
use bytes::Bytes;
use futures::stream::Fuse;
use futures::{AsyncRead, Stream, StreamExt};
use reqwest::Client;

struct StreamAsAsyncRead<St> {
    stream: Fuse<St>,
    last: Option<Bytes>,
}

impl<St: Stream> StreamAsAsyncRead<St> {
    fn new(stream: St) -> Self {
        Self {
            stream: stream.fuse(),
            last: None,
        }
    }
}

impl<St: Stream<Item = reqwest::Result<Bytes>> + Unpin> AsyncRead for StreamAsAsyncRead<St> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let this = &mut *self;
        let data = match &mut this.last {
            Some(data) if !data.is_empty() => data,
            last => {
                let Some(next_data) = ready!(this.stream.poll_next_unpin(cx)) else {
                    return Poll::Ready(Ok(0));
                };
                let next_data = next_data.map_err(|err| io::Error::other(err))?;
                last.insert(next_data)
            }
        };

        let fill_len = std::cmp::min(buf.len(), data.len());
        buf[..fill_len].copy_from_slice(&data[..fill_len]);
        data.advance(fill_len);
        Poll::Ready(Ok(fill_len))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let response = client.get("<address>").send().await?;
    let data_metadata = StreamMetadata {
        schema: Schema::default(),
        version: MetadataVersion::V5,
        ipc_schema: IpcSchema {
            fields: Vec::new(),
            is_little_endian: true,
        },
    };
    let mut stream = AsyncStreamReader::new(
        StreamAsAsyncRead::new(response.bytes_stream()),
        data_metadata,
    );
    while let Some(item) = stream.next().await {
        let item = item?;
        for item in item.into_arrays() {
            let item = arrow::array::ArrayRef::from(item);
            // Do something with `item`.
        }
    }

    Ok(())
}

Груз.томл:

[dependencies]
arrow = "50.0.0"
reqwest = { version = "0.11.24", features = ["stream"] }
tokio = { version = "1.36.0", features = ["full"] }
bytes = "1.5.0"
futures = "0.3.30"
arrow-format = "0.8.1"
arrow2 = { version = "0.18.0", features = [
    "io_ipc",
    "io_ipc_read_async",
    "arrow",
] }

Другой, более простой вариант — отказаться от поддержки асинхронности и просто использовать блокирующий интерфейс reqwest:

use arrow::ipc::reader::StreamReader;
use reqwest::blocking::Client;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let response = client.get("<address>").send()?;
    let reader = StreamReader::try_new(response, None)?;
    for item in reader {
        let item = item?;
        // Do something with `item`.
    }
    Ok(())
}

Другие вопросы по теме