Замораживание spark sql

У меня проблема со Spark SQL. Я прочитал некоторые данные из файлов csv. Затем я выполняю операцию groupBy и присоединяюсь, а завершенная задача - записывать объединенные данные в файл. Моя проблема - временной интервал (я показываю это в журнале ниже с пробелом).

18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1069
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1003
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 965
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1073
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1038
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 900
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 903
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 938
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on 10.4.110.24:36423 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on omm104.in.nawras.com.om:43133 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 969
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1036
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 970
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1006
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1039
18/08/07 23:39:47 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
18/08/07 23:39:54 INFO parquet.ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter

18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 

Фреймы данных - это небольшие по размеру ~ 5000 записей и ~ 800 столбцов. Я использую следующий код:

val parentDF = ...
val childADF = ...
val childBDF = ...

val aggregatedAColName = "CHILD_A"
val aggregatedBColName = "CHILD_B"

val columns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3", "val_0")
val keyColumns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3")

val nestedAColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedAColName)
val childADataFrame = childADF
  .select(nestedAColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedAColName).alias(aggregatedAColName))
val joinedWithA = parentDF.join(childADataFrame, keyColumns, "left")

val nestedBColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedBColName)
val childBDataFrame = childBDF
  .select(nestedBColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedBColName).alias(aggregatedBColName))
val joinedWithB = joinedWithA.join(childBDataFrame, keyColumns, "left")

Время обработки 30 файлов (всего ~ 85 тыс. Записей) странно велико ~ 38 мин. Вы когда-нибудь видели подобную проблему?

1
0
615
1

Ответы 1

Старайтесь избегать вызова repartition (), поскольку он вызывает ненужные перемещения данных внутри узлов.

Согласно Learning Spark

Имейте в виду, что перераспределение данных - довольно дорогостоящая операция. Spark также имеет оптимизированную версию repartition (), называемую coalesce (), которая позволяет избежать перемещения данных, но только если вы уменьшаете количество разделов RDD.

Проще говоря, COALESCE: - только для уменьшения количества разделов, без перетасовки данных, он просто сжимает разделы.

Это не совсем так. Coalesce сводит к минимуму перетасовку данных, поскольку он используется для уменьшения количества разделов. В то время как repartition может уменьшать или увеличивать количество разделов. Тем не менее, это не исключает полностью перетасовки данных. Это также зависит от того, установлен ли shuffle flag на true или false. Я обращаюсь к этим вопросам: stackoverflow.com/questions/31610971/… и stackoverflow.com/questions/42034314/…

Jeremy 10.08.2018 16:26

Пожалуйста, вставьте свой физический план

Chandan Ray 10.08.2018 17:11

Другие вопросы по теме