Привет, я пытаюсь читать из postgres в полярную рамку в общем виде.
Я прочитал сообщение здесь Rust: Чтение фрейма данных в полярах из mysql
о чтении из mysql и хочу изменить это, чтобы мне не нужно было обрабатывать столбцы для каждого нового запроса.
Я планировал сначала сохранить все значения в большом векторе, упорядоченном по столбцам. Хранение каждого типа в перечислении.
У меня есть следующий код.
let rows = client.query(&select_sql, &[]).await?;
#[derive(Clone, Debug)]
enum GenericValue {
String(String),
Bool(bool),
DateTime(DateTime<Utc>),
Int32(i32),
UInt(u32),
}
let number_of_rows = rows.len();
let number_of_columns = rows.first().expect("expected one row").columns().len();
println!("Number of columns: {:?}", number_of_columns);
println!("Number of rows: {:?}", number_of_rows);
let mut data: Vec<Option<GenericValue>> = vec![None; number_of_columns * number_of_rows];
for (row_index, row) in rows.iter().enumerate() {
for (col_index, column) in row.columns().iter().enumerate() {
let colType: String = column.type_().to_string();
let index = col_index * number_of_columns + row_index;
if colType == "int4" {
data[index] = Some(GenericValue::Int32(row.get(col_index)));
}
else if colType == "text" {
data[index] = Some(GenericValue::String(row.get(col_index)));
}
else if colType == "varchar" {
data[index] = Some(GenericValue::String(row.get(col_index)));
}
else if colType == "bool" {
data[index] = Some(GenericValue::Bool(row.get(col_index)));
}
else if colType == "timestamptz" {
data[index] = Some(GenericValue::DateTime(row.get(col_index)));
}
else {
panic!("{}", &colType);
}
}
for c in 0 .. number_of_columns {
let start_index = c * number_of_columns;
let slice = data[start_index .. start_index + number_of_rows];
let first: Option<GenericValue> = data[start_index];
let chunked_data: Utf8Chunked = slice.iter().map(|v| v.unwrap()).collect();
println!("{:?}", chunked_data);
// let ca_country: Utf8Chunked = values.iter().map(|v| &*v.country).collect();
}
Проблема в том, что мне нужно реализовать трейт PolarsAsRef<str>
дляGenericValue
что я не уверен, как это сделать, и как эти фрагментированные данные будут обрабатывать NULLS?
Как я могу создать здесь правильные фрагментированные данные для каждой серии?
Обратите внимание, что это сделано для того, чтобы отойти от коннектора, поскольку он все еще кажется привязанным к очень старым полярам. и в любом случае я не получаю от этого большого выигрыша в производительности, поскольку использую секционированные таблицы postgres, которые, похоже, нарушают параллелизм коннектора.
Спасибо
В конце концов я ответил на него, как показано ниже. Кажется, это реальная нехватка документов и примеров по этому поводу.
use arrow2::array::{Array, Int32Array, Utf8Array, StructArray};
use polars_core::prelude::ArrayRef;
use arrow2::datatypes::{DataType, Field, Schema};
use polars::prelude::*;
use tokio_postgres::{Client, NoTls, Error, ToStatement};
use tokio_postgres::binary_copy::BinaryCopyInWriter;
use tokio_postgres::types::ToSql;
use tokio_postgres::types::Type;
use tokio_postgres::{Row, Column};
use polars_core::chunked_array::ChunkedArray; // ::from_chunks
async fn postgres_to_polars(rows: &[Row]) -> std::result::Result<DataFrame, PolarsError> {
let mut fields: Vec<(String, polars_core::datatypes::DataType)> = Vec::new();
let column_count = rows[0].len();
for i in 0..column_count {
let field_name = rows[0].columns()[i].name().to_string();
let data_type: polars_core::datatypes::DataType = match rows[0].columns()[i].type_() {
&Type::VARCHAR => polars_core::datatypes::DataType::Utf8,
&Type::INT4 => polars_core::datatypes::DataType::Int32,
// Add more cases for other PostgreSQL types as needed
_ => panic!("Unsupported PostgreSQL data type"),
};
// let field = Field::new(&field_name, data_type, false);
fields.push((field_name, data_type));
}
let first_row = rows.first().unwrap();
let mut arrow_arrays: Vec<Vec<ArrayRef>> = vec![];
for (col_index, column) in first_row.columns().iter().enumerate() {
let mut array_data: Vec<ArrayRef> = vec![];
for (row_index, row) in rows.iter().enumerate() {
let array: ArrayRef = match column.type_() {
&Type::VARCHAR => Box::new(Utf8Array::<i64>::from(vec![Some(row.try_get::<usize, String>(col_index).unwrap())])),
&Type::INT4 => Box::new(Int32Array::from(vec![Some(row.try_get(col_index).unwrap())])),
// Add more cases for other PostgreSQL types as needed
_ => panic!("Unsupported PostgreSQL data type"),
};
array_data.push(array);
}
arrow_arrays.push(array_data);
}
let mut series: Vec<Series> = vec![];
for (array, field) in arrow_arrays.iter().zip(fields.iter()) {
unsafe {
let s = Series::from_chunks_and_dtype_unchecked(
&field.0,
array.to_vec(),
&field.1
);
series.push(s);
}
}
let df: PolarsResult<DataFrame> = DataFrame::new(series);
df
}