Мне нужно «проверить» определенную информацию во время пакетной обработки с помощью pyspark, которая понадобится в следующих пакетах.
Для этого варианта использования DataFrame.checkpoint, похоже, подходит. Хотя я нашел много мест, объясняющих, как его создать, я не нашел ни одного, как восстановить или прочитать контрольную точку.
Для проверки я создал простой тестовый класс с двумя (2) тестами. Первый читает CSV и создает сумму. Второй должен просто подвести итоги:
import pytest
from pyspark.sql import functions as f
class TestCheckpoint:
@pytest.fixture(autouse=True)
def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
self.spark = spark_unit_test_fixture
self.dir = data_dir("")
self.checkpoint_dir = tmp_path
def test_first(self):
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers.csv")
.load(self.dir))
sum = df.agg(f.sum("_c1").alias("sum"))
sum.checkpoint()
assert 1 == 1
def test_second(self):
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers2.csv")
.load(self.dir))
sum = # how to get back the sum?
Создание контрольной точки в первом тесте работает нормально (установите tmp_path в качестве каталога контрольной точки), и я вижу папку, созданную с файлом.
Но как мне это прочитать?
И как вы справляетесь с несколькими контрольно-пропускными пунктами? Например, одна контрольная точка по сумме, а другая по среднему?
Существуют ли лучшие подходы к хранению состояния в пакетах?
Для полноты CSV выглядит так:
1719228973,1
1719228974,2
И это лишь минимальный пример для запуска — мой реальный сценарий более сложен.
Хотя теоретически контрольные точки сохраняются во всех заданиях Spark и к ним можно получить доступ из других заданий Spark, прочитав файлы напрямую без необходимости пересчитывать всю родословную, они не упрощают чтение контрольных точек непосредственно из сохраненных файлов из других заданий Spark. Если вам интересно, вот ответ, в котором показано, как выглядят имена файлов при возникновении контрольной точки.
Поэтому в вашем случае я бы посоветовал сохранить на диск самостоятельно и прочитать файл(ы), когда он понадобится вам в другой работе. Вы можете использовать механизм хранения (например, паркет, эффективность которого зависит от ваших данных и характера обработки). Что-то вроде этого:
import pytest
from pyspark.sql import functions as f
class TestCheckpoint:
@pytest.fixture(autouse=True)
def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
self.spark = spark_unit_test_fixture
self.dir = data_dir("")
self.checkpoint_dir = tmp_path
def test_first(self):
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers.csv")
.load(self.dir))
sum_df = df.agg(f.sum("_c1").alias("sum"))
sum_df.write.mode("overwrite").parquet(str(self.checkpoint_dir / "sum"))
assert 1 == 1
def test_second(self):
previous_sum = self.spark.read.parquet(str(self.checkpoint_dir / "sum"))
previous_sum_value = previous_sum.collect()[0]["sum"]
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers2.csv")
.load(self.dir))
new_sum = df.agg(f.sum("_c1").alias("sum"))
total_sum = previous_sum_value + new_sum.collect()[0]["sum"]
assert 1 == 1
Тем не менее, если вам нужно получить доступ к данным контрольной точки в рамках одного и того же задания Spark, вы можете просто сохранить ссылку на фрейм данных следующим образом.
sum = df.agg(f.sum("_c1").alias("sum"))
sum = sum.checkpoint() # hold on to this reference to access the checkpointed data
В качестве альтернативы у вас также есть df.persist(StorageLevel.DISK_ONLY)
, который также позволяет хранить на диске, сохраняя при этом происхождение данных. Однако после завершения задания данные удаляются.