У меня есть секционированные данные паркета:
dir/batch_date=2023-02-13/batch_hour=09
Мне нужно прочитать данные за последние 14 дней через программу spark. В настоящее время я читаю данные и применяю фильтр даты к фрейму данных как batch_date минус 14 дней. Есть ли способ установить диапазон каталогов, чтобы ограничить чтение только каталогами за последние 14 дней, а не всем набором данных.
Спасибо





То, что вы уже делаете, является оптимальным из-за концепции PartitionFilters в apache spark, поэтому, когда вы применяете фильтры к секционированному столбцу, эти фильтры применяются к данным в источнике, прежде чем какие-либо данные будут отправлены по сети, чтобы уменьшить объем передаваемых данных.
Например, у меня есть данные, разделенные по годам:
/path/
Year=2018/
file.parquet
Year=2019/
file.parquet
Year=2020/
file.parquet
Year=2021/
file.parquet
Year=2022/
file.parquet
Year=2023/
file.parquet
Если я применяю следующий код:
spark.read.parquet("/path/").filter(col("Year") >= "2020").explain()
Я получу следующий Физический План:
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Variable_name#0,Value#1,Units#2,Year#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/user/out..., PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)], PushedFilters: [], ReadSchema: struct<Variable_name:string,Value:string,Units:string>
Если вы ищете PartitionFilters, вы найдете это:
PartitionFilters: [isnotnull(Year#3), (Year#3 >= 2020)]
Это означает, что применяются фильтры разделов, и будут загружены только нужные разделы, однако, если вы не видите PartitionFilters, это означает, что что-то пошло не так, и будут загружены все данные.
Если по какой-то причине PartitionFilters не сработали, вы всегда можете использовать hadoop для фильтрации папок, которые хотите загрузить с помощью spark.
val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
val filesToRead = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.split(" = ")(1) >= min_date)
Затем прочитайте filesToRead с помощью spark.
Какой код вы использовали? Похоже, вы уже делаете то, о чем просите. Фильтрация фрейма данных по возможности подавляется, и считываются только нужные папки.