У меня есть пять Hive таблиц с именами A, B, C, D и E. Для каждой таблицы есть customer_id в качестве ключа для соединения между ними. Кроме того, каждая таблица содержит не менее 100:600 столбцов, все они имеют формат Parquet.
Пример одной таблицы ниже:
CREATE TABLE table_a
(
customer_id Long,
col_1 STRING,
col_2 STRING,
col_3 STRING,
.
.
col_600 STRING
)
STORED AS PARQUET;
Мне нужно набрать два очка,
sortByKey перед присоединением, но все еще есть узкое место в производительности. Я пытался reparation по ключу перед присоединением, но производительность по-прежнему не очень хорошая. Я пытался увеличить параллелизм для Spark до 6000 со многими исполнителями, но не смог добиться хороших результатов.Пример соединения, которое я попробовал ниже,
val dsA = spark.table(table_a)
val dsB = spark.table(table_b)
val dsC = spark.table(table_c)
val dsD = spark.table(table_d)
val dsE = spark.table(table_e)
val dsAJoineddsB = dsA.join(dsB,Seq(customer_id),"inner")





Я думаю, что в этом случае прямое соединение не является оптимальным случаем. Вы можете выполнить эту задачу, используя простой способ, описанный ниже.
FeatureData с двумя полями case class FeatureData(customer_id:Long, featureValue:Map[String,String])groupByKey и union весь набор данных с одним и тем же ключом.Я вышеописанным способом будет быстрее объединиться, чем присоединиться. Но это нужно больше работы.
После этого у вас будет набор данных с ключом, картой. Вы примените преобразование для key, Map(feature_name).
Простой пример реализации выглядит следующим образом:
Сначала вы сопоставите dataset с case class, а затем сможете объединить их все. После этого вы groupByKey сопоставите его и уменьшите.
case class FeatureMappedData(customer_id:Long, feature: Map[String, String])
val dsAMapped = dsA.map(row ⇒
FeatureMappedData(row.customer_id,
Map("featureA" -> row.featureA,
"featureB" -> row.featureB)))
val unionDataSet = dsAMapped union dsBMapped
unionDataSet.groupByKey(_.customer_id)
.mapGroups({
case (eid, featureIter) ⇒ {
val featuresMapped: Map[String, String] = featureIter.map(_.feature).reduce(_ ++ _).withDefaultValue("0")
FeatureMappedData(customer_id, featuresMapped)
}
})
Пожалуйста, посмотрите еще раз
Спасибо, это работает намного лучше. Есть ли у вас какие-либо другие идеи для повышения производительности?
Ваша идея кажется хорошей, но не могли бы вы дать мне фрагмент кода, объясняющий, как его применить. Я все еще новичок в Spark и Scala и не могу реализовать эту идею.