Как прочитать/восстановить Dataframe с контрольной точкой – в разных пакетах

Мне нужно «проверить» определенную информацию во время пакетной обработки с помощью pyspark, которая понадобится в следующих пакетах.

Для этого варианта использования DataFrame.checkpoint, похоже, подходит. Хотя я нашел много мест, объясняющих, как его создать, я не нашел ни одного, как восстановить или прочитать контрольную точку.

Для проверки я создал простой тестовый класс с двумя (2) тестами. Первый читает CSV и создает сумму. Второй должен просто подвести итоги:

import pytest
from pyspark.sql import functions as f

class TestCheckpoint:

    @pytest.fixture(autouse=True)
    def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
        self.spark = spark_unit_test_fixture
        self.dir = data_dir("")
        self.checkpoint_dir = tmp_path

    def test_first(self):
        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers.csv")
              .load(self.dir))

        sum = df.agg(f.sum("_c1").alias("sum"))
        sum.checkpoint()
        assert 1 == 1

    def test_second(self):
        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers2.csv")
              .load(self.dir))

        sum = # how to get back the sum?

Создание контрольной точки в первом тесте работает нормально (установите tmp_path в качестве каталога контрольной точки), и я вижу папку, созданную с файлом.

Но как мне это прочитать?

И как вы справляетесь с несколькими контрольно-пропускными пунктами? Например, одна контрольная точка по сумме, а другая по среднему?

Существуют ли лучшие подходы к хранению состояния в пакетах?

Для полноты CSV выглядит так:

1719228973,1
1719228974,2

И это лишь минимальный пример для запуска — мой реальный сценарий более сложен.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
3
0
88
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Хотя теоретически контрольные точки сохраняются во всех заданиях Spark и к ним можно получить доступ из других заданий Spark, прочитав файлы напрямую без необходимости пересчитывать всю родословную, они не упрощают чтение контрольных точек непосредственно из сохраненных файлов из других заданий Spark. Если вам интересно, вот ответ, в котором показано, как выглядят имена файлов при возникновении контрольной точки.

Поэтому в вашем случае я бы посоветовал сохранить на диск самостоятельно и прочитать файл(ы), когда он понадобится вам в другой работе. Вы можете использовать механизм хранения (например, паркет, эффективность которого зависит от ваших данных и характера обработки). Что-то вроде этого:

import pytest
from pyspark.sql import functions as f

class TestCheckpoint:

    @pytest.fixture(autouse=True)
    def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
        self.spark = spark_unit_test_fixture
        self.dir = data_dir("")
        self.checkpoint_dir = tmp_path

    def test_first(self):
        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers.csv")
              .load(self.dir))

        sum_df = df.agg(f.sum("_c1").alias("sum"))
        sum_df.write.mode("overwrite").parquet(str(self.checkpoint_dir / "sum"))
        assert 1 == 1

    def test_second(self):
        previous_sum = self.spark.read.parquet(str(self.checkpoint_dir / "sum"))
        previous_sum_value = previous_sum.collect()[0]["sum"]

        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers2.csv")
              .load(self.dir))

        new_sum = df.agg(f.sum("_c1").alias("sum"))
        total_sum = previous_sum_value + new_sum.collect()[0]["sum"]

        assert 1 == 1

Тем не менее, если вам нужно получить доступ к данным контрольной точки в рамках одного и того же задания Spark, вы можете просто сохранить ссылку на фрейм данных следующим образом.

sum = df.agg(f.sum("_c1").alias("sum"))
sum = sum.checkpoint() # hold on to this reference to access the checkpointed data

В качестве альтернативы у вас также есть df.persist(StorageLevel.DISK_ONLY), который также позволяет хранить на диске, сохраняя при этом происхождение данных. Однако после завершения задания данные удаляются.

Другие вопросы по теме