Кадры данных с общим ленивым предком означают повторяющиеся вычисления?

Зависимость DAG

Описание

Довольно просто: по сути, я читаю некоторые файлы паркета с диска, используя поляры, которые являются источником данных. Выполнение умеренно тяжелой обработки (несколько миллионов строк) для создания промежуточного кадра данных, а затем создание двух результатов, которые необходимо записать обратно в некоторую базу данных.

Технологический стек
  • Убунту 22.04
  • Питон 3.10
  • Поляры 1.2.1
Вопрос

Polars рекомендует, насколько это возможно, использовать ленивые вычисления для оптимизации выполнения. Теперь окончательные результаты (result_1 и result_2), очевидно, необходимо материализовать.

Но если я позвоню этим двум последовательно

#! /usr/bin/env python3
# encoding: utf-8
import polars as pl
...
result_1.collect() # Materialise result 1
result_2.collect() # Materialise result 2

Повторяется ли преобразование исходного кадра в промежуточный (общий предок)? Если да, то это явно нежелательно. В этом случае мне придется материализовать промежуточный кадр, а затем выполнить остальную обработку в режиме ожидания.

Есть ли документация от полярников об ожидаемом поведении и рекомендуемых методах в отношении этого сценария?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
50
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Попробуйте pl.collect_all собрать несколько кадров данных.

pl.collect_all([result1, result2])

Ссылка: https://docs.pola.rs/api/python/stable/reference/api/polars.collect_all.html

Я думаю, что это все равно будет выполнять промежуточную ленивую обработку кадров дважды, но будет делать это параллельно.

Roman Pekar 20.08.2024 10:45
Ответ принят как подходящий

Честно говоря, я думаю, что для производственного кода лучше всего collect() промежуточные результаты, а затем повторно использовать их в result_1 и result_2. Было бы неплохо, чтобы он collect_all() мог найти некоторые общие подграфы вычислений и кэшировать их, но я не думаю, что это происходит (хотя я особо не проверял Rust-код).

Возможно, вы могли бы попробовать обходной путь с помощью Polars.concat():

# let's say you have some intermediate LazyFrame with some calculations
lf_intermediate = lf.group_by("a").agg()

# and here you want to create 2 different results out of this DataFrame
# You can add a 'partitioning' column so you can separate your results after
# collection
lf1 = lf_intermediate.with_columns(pl.col.a * 2, partition=pl.lit(1))
lf2 = lf_intermediate.with_columns(pl.col.a / 3, partition=pl.lit(2))

# create combined result
df_result = pl.concat([lf1, lf2], how='diagonal').collect()

# and now separate results into different dataframes
df1 = lf_result.filter(pl.col.partition == 1)
df2 = lf_result.filter(pl.col.partition == 2)

Вы можете видеть, что промежуточная часть кэшируется во время расчета:

pl.concat([lf1, lf2], how='diagonal').explain(optimized=True)

UNION
  PLAN 0:
     WITH_COLUMNS:
     [[(col("a")) * (2)], dyn int: 1.alias("partition"), null.cast(Float64).alias("c")] 
      CACHE[id: 0, cache_hits: 1]
        AGGREGATE
            [] BY [col("a")] FROM
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
  PLAN 1:
    simple π 3/3 ["a", "partition", "c"]
       WITH_COLUMNS:
       [[(col("a")) / (3)].alias("c"), dyn int: 2.alias("partition")] 
        CACHE[id: 0, cache_hits: 1]
          AGGREGATE
            [] BY [col("a")] FROM
            DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
END UNION

Спасибо, я подал запрос на добавление функции в репозиторий Polars на Github.

Della 21.08.2024 07:56

Другие вопросы по теме