Мне интересно, повлияет ли заказ кадра данных перед разделением на скорость вычислений/потребление ресурсов. Чтобы быть конкретным, у меня есть файлы паркета, сохраненные в Databricks, и я хочу фильтровать по двум столбцам, но другой слишком гранулирован, чтобы разделять его только на части. Если сначала упорядочить набор данных, я вижу, что второй столбец упорядочивается в разделе, но действительно ли это имеет смысл для Databricks? Будет ли Databricks распознавать, что записи отсортированы по второму столбцу, и ускорит чтение?
Я хочу искать фильмы по году выпуска, а иногда и по названию. Но название фильмов слишком кардинальное, чтобы быть разделом, поэтому я не включил его в раздел partitionBy.
df.orderBy("year","movie_name").write.partitionBy("year").csv("dbfs:/FileStore/movies")
Является ли приведенный выше запрос лучше, чем этот?
df.write.partitionBy("year").csv("dbfs:/FileStore/movies")
Или как лучше разбивать в таких случаях? Год и имя наверняка будут двумя наиболее часто используемыми столбцами в наборе данных.
Вы ничего не выиграете, упорядочив перед записью паркетного файла, потому что информация об упорядочивании не сохраняется в самом файле.
Вы можете проверить это, используя метод explain()
фрейма данных. Сделаем сами:
df = spark.createDataFrame(
[
("niceMovie2", 2020),
("niceMovie1", 2020),
("niceMovie3", 2021)
],
["Title", "Year"]
)
>>> df.show()
+----------+----+
| Title|Year|
+----------+----+
|niceMovie2|2020|
|niceMovie1|2020|
|niceMovie3|2021|
+----------+----+
Запишем датафрейм в паркетный файл без порядка, а затем перечитаем его, отфильтровав фильм по Year
и Title
:
df.write.partitionBy("Year").parquet("movies.parquet")
df2 = spark \
.read \
.parquet("movies.parquet") \
.filter(col("Year") == 2020) \
.filter(col("Title") == "niceMovie2")
>>> df2.explain()
== Physical Plan ==
*(1) Filter (isnotnull(Title#4) AND (Title#4 = niceMovie2))
+- *(1) ColumnarToRow
+- FileScan parquet [Title#4,Year#5] Batched: true, DataFilters: [isnotnull(Title#4), (Title#4 = niceMovie2)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/somewhere/movies.parquet], PartitionFilters: [isnotnull(Year#5), (Year#5 = 2020)], PushedFilters: [IsNotNull(Title), EqualTo(Title,niceMovie2)], ReadSchema: struct<Title:string>
Как вы можете видеть на физическом плане, файл будет прочитан, а разбиение на разделы будет использовано для эффективного удаления неиспользованных лет (см. бит PartitionFilters: [isnotnull(Year #13), (Year#13 = 2020)]
).
Теперь давайте сделаем то же самое, но упорядочим фрейм данных по Title
перед его чтением:
df.orderBy("Title").write.partitionBy("Year").parquet("moviesOrdered.parquet")
df3 = spark \
.read \
.parquet("movies.parquet") \
.filter(col("Year") == 2020) \
.filter(col("Title") == "niceMovie2")
>>> df3.explain()
== Physical Plan ==
*(1) Filter (isnotnull(Title#0) AND (Title#0 = niceMovie2))
+- *(1) ColumnarToRow
+- FileScan parquet [Title#0,Year#1] Batched: true, DataFilters: [isnotnull(Title#0), (Title#0 = niceMovie2)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/somewhere/moviesOrdered.parquet], PartitionFilters: [isnotnull(Year#1), (Year#1 = 2020)], PushedFilters: [IsNotNull(Title), EqualTo(Title,niceMovie2)], ReadSchema: struct<Title:string>
Вы видите, что в данном случае физический план точно такой же. Так что разницы в производительности нет.
Хммм, здесь много переменных: насколько велики ваши данные и насколько велик ваш кластер? Кроме того, сколько времени занимают ваши искровые задания? И вас в основном интересует простое извлечение 1 записи из файла паркета?
У меня сейчас нет конкретной информации об этом, но я ожидаю огромную таблицу (миллиарды записей) и соответствующий размер кластера. В большинстве сценариев я хочу извлечь несколько фильмов на основе их названия и года (например, фильмы и год, мы также можем подумать о дате транзакции и номере банковского счета, где это больше похоже на реальность)
Хм, тогда я бы не стал сильно волноваться по этому поводу. Если ваш кластер имеет правильный размер w.r.t. ваши данные, вы не делаете здесь никаких сумасшедших вещей. Ваше разбиение по годам (в зависимости от кардинальности ваших лет) уже даст вам большой прирост производительности. Вы также можете посмотреть, сколько файлов частей будет создано в каждом разделе года. Постарайтесь, чтобы каждый файл части был размером около 100 МБ, это хороший первый подход. Вы можете контролировать это, выполнив что-то вроде df.repartition(NR_OF_PARTITIONS, someOtherCol).write.partitionBy("Year").parquet(filename)
Привет, Коэдлт, спасибо! Есть ли у вас какой-либо совет, как разбить столбец с высокой кардинальностью (название фильма), поскольку порядок не помогает, или знаете какие-либо ресурсы, где об этом можно прочитать?