Уже было задано множество вопросов о количестве задач Spark и о том, как это соотносится с количеством разделов. Но как-то не могу понять следующий случай.
У меня есть таблица Hive (папка HDFS), которая содержит 160 сжатых файлов Parquet. Файлы в основном хорошо сбалансированы: самый маленький - 7,5 МБ, самый большой - 49,2 МБ. В браузере HDFS я вижу, что каждый файл находится в пределах 1 (неполного) блока HDFS (128 МБ).
Кластер имеет следующие свойства: 10 машин, 1 мастер и 9 рабочих. Каждая машина имеет 6 ядер (12 виртуальных ядер). Я использую пряжу. Кроме того:
spark.executor.cores = 6
Теперь я создаю следующий фрейм данных:
val myDF = spark.sql("SELECT * FROM myHiveTable WHERE myCol='someValue')
Еще до запуска задания можно заранее узнать, что:
myDF.rdd.partitions.size
возвращает 60.
Чтобы запустить задание, нужно действие, поэтому я пишу «myDF» в HDFS. Работа действительно приводит к 42 исполнителям и 60 заданий.
Мои вопросы:
Если я начал со 160 разделов, как получилось, что у меня 60?
Если бы у меня было 60 задач и 10 машин, то оптимально мне понадобилось бы только 10 исполнителей (где-то я читал, что каждый исполнитель может запускать столько задач параллельно, сколько ядер, которое в моем случае равно 6). Я знаю, что это произойдет только в том случае, если набор данных идеально сбалансирован между узлами данных, но 42 Executors кажутся мне далекими от 10. Или мои рассуждения ошибочны?
Как Spark может знать заранее, еще до выполнения запроса, что это приведет к 60 разделам.
Спасибо!
@SamsonScharfrichter Не могли бы вы уточнить свой комментарий?
Есть ли здесь прогресс?
Думаю, это могут быть ответы на вопросы 1 и 3.
По-видимому, если я читаю таблицу Hive (которая на самом деле является папкой) как rdd, у меня также будет такое же количество разделов (60). Количество разделов RDD определяется конкретным InputFormat. Источник: Количество разделов по умолчанию Spark RDD
Итак, я искал, каково поведение Parquet, вызванный комментарием @Samson Scharfrichter, и нашел следующее: https://github.com/animeshthibitedi/notes/wiki/Parquet-partition-calculation
Min(defaultMinSplitSize (128MB, `spark.sql.files.maxPartitionBytes`,
Max(openCostInByte(8MB, `spark.sql.files.openCostInBytes`,
totalSize/defaultParallelism)
)
Так что, возможно, эта формула объясняет, почему 60, и как это можно рассчитать заранее.
Для записи, уровень параллелизма с файлами Parquet - это количество «блоков строк», т.е. возможно больше, чем количество файлов (или меньше, если у вас есть пустые файлы).