У меня есть фрейм данных со схемой, как показано ниже
root
|-- date: timestamp (nullable = true)
|-- questionAnswerList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- questionNumber: string (nullable = true)
| | |-- listAnswers: array (nullable = true)
| | | |-- element: string(containsNull = true)
И я хочу добавить новое поле внутри массива структуры, как показано ниже.
root
|-- date: timestamp (nullable = true)
|-- questionAnswerList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- index: integer (nullable = true)
| | |-- questionNumber: string (nullable = true)
| | |-- listAnswers: array (nullable = true)
| | | |-- element: string(containsNull = true)
Я попытался использовать UDF, как показано ниже.
val addIndexInStruct: UserDefinedFunction = udf((data: Seq[Row]) => {
data.zipWithIndex.map{case (Row(x:String,y:Array[String]), index) => (index, x, y )}
})
df.withColumn("newCol",addIndexInStruct($"questionAnswerList")).show(false)
Но у меня следующая ошибка:
Caused by: scala.MatchError: ([Q10,WrappedArray(R10.1, R10.2)],0) (of class scala.Tuple2)
Кто-нибудь знает, как это сделать в spark 2.X? В других сообщениях я видел, что в spark 3.X можно использовать функцию преобразования.
Я наконец решил это. Seq нужно было использовать вместо Array в части сопоставления с образцом.
val addIndexInStruct: UserDefinedFunction = udf((data: Seq[Row]) => {
data.zipWithIndex.map{case (Row(x: String,y: Seq[String]), index) => (index, x, y )}
})