Создайте фрейм данных polars из postgres sql в общем виде

Привет, я пытаюсь читать из postgres в полярную рамку в общем виде.

Я прочитал сообщение здесь Rust: Чтение фрейма данных в полярах из mysql

о чтении из mysql и хочу изменить это, чтобы мне не нужно было обрабатывать столбцы для каждого нового запроса.

Я планировал сначала сохранить все значения в большом векторе, упорядоченном по столбцам. Хранение каждого типа в перечислении.

У меня есть следующий код.

let rows = client.query(&select_sql, &[]).await?;

  #[derive(Clone, Debug)]
  enum GenericValue {
    String(String),
    Bool(bool),
    DateTime(DateTime<Utc>),
    Int32(i32),
    UInt(u32),
  }

  let number_of_rows = rows.len();
  let number_of_columns = rows.first().expect("expected one row").columns().len();

  println!("Number of columns: {:?}", number_of_columns);
  println!("Number of rows: {:?}", number_of_rows);

  let mut data: Vec<Option<GenericValue>> = vec![None; number_of_columns * number_of_rows]; 

  for (row_index, row) in rows.iter().enumerate() {

    for (col_index, column) in row.columns().iter().enumerate() {
        let colType: String = column.type_().to_string();
        
        let index = col_index * number_of_columns + row_index;

        if colType == "int4" { 
            data[index] = Some(GenericValue::Int32(row.get(col_index)));
        }
        else if colType == "text" {
            data[index] = Some(GenericValue::String(row.get(col_index)));
        }
        else if colType == "varchar" {
          data[index] = Some(GenericValue::String(row.get(col_index)));
        }
        else if colType == "bool" {
            data[index] = Some(GenericValue::Bool(row.get(col_index)));
        }
        else if colType == "timestamptz" {
          data[index] = Some(GenericValue::DateTime(row.get(col_index)));
        }
        else {
            panic!("{}", &colType);
        }
    }

   for c in 0 .. number_of_columns {

      let start_index = c * number_of_columns;
      let slice = data[start_index .. start_index + number_of_rows];

      let first: Option<GenericValue> = data[start_index];

      let chunked_data: Utf8Chunked = slice.iter().map(|v| v.unwrap()).collect();

      println!("{:?}", chunked_data);


      // let ca_country: Utf8Chunked = values.iter().map(|v| &*v.country).collect();

   }

Проблема в том, что мне нужно реализовать трейт PolarsAsRef<str> дляGenericValue что я не уверен, как это сделать, и как эти фрагментированные данные будут обрабатывать NULLS? Как я могу создать здесь правильные фрагментированные данные для каждой серии?

Обратите внимание, что это сделано для того, чтобы отойти от коннектора, поскольку он все еще кажется привязанным к очень старым полярам. и в любом случае я не получаю от этого большого выигрыша в производительности, поскольку использую секционированные таблицы postgres, которые, похоже, нарушают параллелизм коннектора.

Спасибо

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
165
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В конце концов я ответил на него, как показано ниже. Кажется, это реальная нехватка документов и примеров по этому поводу.

use arrow2::array::{Array, Int32Array, Utf8Array, StructArray};

use polars_core::prelude::ArrayRef;

use arrow2::datatypes::{DataType, Field, Schema};
use polars::prelude::*;

use tokio_postgres::{Client, NoTls, Error, ToStatement};
use tokio_postgres::binary_copy::BinaryCopyInWriter;
use tokio_postgres::types::ToSql;
use tokio_postgres::types::Type;
use tokio_postgres::{Row, Column};

use polars_core::chunked_array::ChunkedArray; // ::from_chunks

async fn postgres_to_polars(rows: &[Row]) -> std::result::Result<DataFrame, PolarsError> {

    let mut fields: Vec<(String, polars_core::datatypes::DataType)> = Vec::new();
    let column_count = rows[0].len();
    for i in 0..column_count {
        let field_name = rows[0].columns()[i].name().to_string();
        let data_type: polars_core::datatypes::DataType = match rows[0].columns()[i].type_() {
            &Type::VARCHAR => polars_core::datatypes::DataType::Utf8,
            &Type::INT4 => polars_core::datatypes::DataType::Int32,
            // Add more cases for other PostgreSQL types as needed
            _ => panic!("Unsupported PostgreSQL data type"),
        };
        // let field = Field::new(&field_name, data_type, false);
        fields.push((field_name, data_type));
    }

    let first_row = rows.first().unwrap();

    let mut arrow_arrays: Vec<Vec<ArrayRef>> = vec![];
    
    for (col_index, column) in first_row.columns().iter().enumerate() {

        let mut array_data: Vec<ArrayRef> = vec![];

        for (row_index, row) in rows.iter().enumerate() {
  
            let array: ArrayRef = match column.type_() {
                &Type::VARCHAR => Box::new(Utf8Array::<i64>::from(vec![Some(row.try_get::<usize, String>(col_index).unwrap())])),
                &Type::INT4 => Box::new(Int32Array::from(vec![Some(row.try_get(col_index).unwrap())])),
                // Add more cases for other PostgreSQL types as needed
                _ => panic!("Unsupported PostgreSQL data type"),
            };
  
            array_data.push(array);
  
        }

        arrow_arrays.push(array_data);
    }

    let mut series: Vec<Series> = vec![];

    for (array, field) in arrow_arrays.iter().zip(fields.iter()) {

        unsafe {

            let s = Series::from_chunks_and_dtype_unchecked(
                &field.0,
                array.to_vec(),
                &field.1
            );

            series.push(s);
        }
    }
    
    let df: PolarsResult<DataFrame> = DataFrame::new(series);

    df
}

Другие вопросы по теме