Spark — java.lang.ClassCastException при обработке в udf столбца типа Array[Array[Map[String,String]]]

Я объединяю два столбца в spark типа Array[Map[String,String]], в результате чего получается новый столбец типа Array[Array[Map[String,String]]]. Однако я хотел бы сгладить этот столбец, чтобы получить столбцы типа Array[Map[String,String]] со значениями обоих исходных столбцов.

Я читал, что из Spark 2.4 можно было бы применить flatten непосредственно к объединению столбцов. Что-то вроде этого:

df.withColumn("concatenation", flatten(array($"colArrayMap1", $"colArrayMap2")))

Однако я все еще использую Spark 2.2, поэтому для этого мне нужно использовать udf. Вот что я написал:

def flatten_collection(arr: Array[Array[Map[String,String]]]) = {
    if (arr == null)
        null
    else
        arr.flatten
}
  
val flatten_collection_udf = udf(flatten_collection _)

df.withColumn("concatenation", array($"colArrayMap1", $"colArrayMap2")).withColumn("concatenation", flatten_collection_udf($"concatenation")).show(false)

Но я получаю следующую ошибку:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<array<map<string,string>>>) => array<map<string,string>>)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:835)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:835)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:109)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:380)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[Lscala.collection.immutable.Map;

Я предполагаю, что в udf происходит ошибка приведения, но почему и как ее избежать?

Кроме того, если кто-то знает решение для Spark 2.2, которое не требует использования UDF, еще лучше

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
517
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Адаптировано из ответа здесь. Seq нужно вместо Array.

def concat_arr(
    arr1: Seq[Map[String,String]],
    arr2: Seq[Map[String,String]]
) : Seq[Map[String,String]] =
{
    (arr1 ++ arr2)
}
val concatUDF = udf(concat_arr _)

val df2 = df.withColumn("concatenation", concatUDF($"colArrayMap1", $"colArrayMap2"))

df2.show(false)
+--------------------+--------------------+----------------------------------------+
|colArrayMap1        |colArrayMap2        |concatenation                           |
+--------------------+--------------------+----------------------------------------+
|[[a -> b], [c -> d]]|[[a -> b], [c -> d]]|[[a -> b], [c -> d], [a -> b], [c -> d]]|
+--------------------+--------------------+----------------------------------------+

Обычный Array — это массив Java, на самом деле он не поддерживает все те же функции, что и Seq. Чтобы сделать это, существует неявное преобразование в WrappedArray, но это не происходит автоматически внутри Spark, вы должны принудительно объявить это Seq

AnotherParker 24.12.2020 16:18

Другие вопросы по теме