есть таблица, в которой поле "A" содержит sql запрос. Необходимо добавить дополнительное поле «Б», которое содержало бы время, затрачиваемое на выполнение запроса из поля «А». Я написал UDF, и все работает хорошо, но при кэшировании результирующей таблицы или попытке записать окончательный фрейм данных в физическую таблицу я получил ошибку:
"Не удалось выполнить пользовательскую функцию ($anonfun$1: (string) => нить)"
. В чем может быть проблема? Пример:
val set_time = udf((query: String) => {
val start = new Timestamp(new Date().getDate)
val count = spark.sql(s"${query}").count
val time_query = (new Timestamp(new Date().getTime)).getTime() - start.getTime()
time_query.toString
})
Исходная таблица "источник":
+--------------------+
| A |
+--------------------+
|"Select * From ..." |
|"Select * From ..." |
|"Select * From ..." |
|"Select * From ..." |
|"Select * From ..." |
+--------------------+
val result = spark.sql("from source").
withColumn("B", set_time(col("A")))
result.show
+--------------------+------+
| A | B |
+--------------------+------+
|"Select * From ..." | 356 |
|"Select * From ..." | 642 |
|"Select * From ..." | 2745 |
|"Select * From ..." | 1324 |
|"Select * From ..." | 635 |
+--------------------+------+
Но:
//ERROR
result.write.mode("overwrite").saveAsTable("dbName.result")
//ERROR
val result_cache = result.persist
result_cache.show
Проблема здесь в том, что UDF работает с исполнителями, для которых искровой сеанс недоступен. Итак, я думаю, вы получаете исключение NullPointer в строке "val count = spark.sql..."
.
Вы должны сделать это на драйвере, используя не UDF, а просто function1. Также с помощью collect() я предполагаю, что основная таблица невелика и поместится в память драйвера:
Пример:
import java.util.Date
import java.time.LocalDateTime
val set_time = (query: String) => {
val start = new Timestamp(new Date().getTime)
val count = spark.sql(s"${query}").count
val time_query = (new Timestamp(new Date().getTime)).getTime() - start.getTime()
time_query.toString
}
val result = spark.sql("select 'select 1' as A union all select 'select 2' as A")
val s = result.collect().map(x =>(x(0).asInstanceOf[String],set_time(x(0).asInstanceOf[String]))).toList.toDF("A","B")
s.show
s.cache().show
+--------+---+
| A| B|
+--------+---+
|select 1|171|
|select 2|135|
+--------+---+
PS: также val start = new Timestamp(new Date().getDate)
в вашем примере должно быть val start = new Timestamp(new Date().getTime)