Я хочу выполнить поиск по myMap
. Когда значение col2
равно «0000», я хочу обновить его значением, связанным с ключом col1
. В противном случае я хочу сохранить существующее значение col2
.
val myDF :
+-----+-----+
|col1 |col2 |
+-----+-----+
|1 |a |
|2 |0000 |
|3 |c |
|4 |0000 |
+-----+-----+
val myMap : Map[String, String] ("2" -> "b", "4" -> "d")
val broadcastMyMap = spark.sparkContext.broadcast(myMap)
def lookup = udf((key:String) => broadcastMyMap.value.get(key))
myDF.withColumn("col2", when ($"col2" === "0000", lookup($"col1")).otherwise($"col2"))
Я использовал приведенный выше код в spark-shell, и он отлично работает, но когда я создаю jar-файл приложения и отправляю его в Spark с помощью spark-submit, он выдает ошибку:
org.apache.spark.SparkException: Failed to execute user defined function(anonfun$5: (string) => string)
Caused by: java.lang.NullPointerException
Есть ли способ выполнить поиск без использования UDF, которые не являются лучшим вариантом с точки зрения производительности, или исправить ошибку? Я думаю, что не могу просто использовать соединение, потому что некоторые значения myDF.col2, которые необходимо сохранить, могут быть заменены в ходе операции.
ваш NullPointerException
НЕдействителен. Я доказал это с помощью примера программы, как показано ниже.
ОТЛИЧНО РАБОТАЕТ. вы выполняете приведенную ниже программу.
package com.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction
object MapLookupDF {
Logger.getLogger("org").setLevel(Level.OFF)
def main(args: Array[String]) {
import org.apache.spark.sql.functions._
val spark = SparkSession.builder.
master("local[*]")
.appName("MapLookupDF")
.getOrCreate()
import spark.implicits._
val mydf = Seq((1, "a"), (2, "0000"), (3, "c"), (4, "0000")).toDF("col1", "col2")
mydf.show
val myMap: Map[String, String] = Map("2" -> "b", "4" -> "d")
println(myMap.toString)
val broadcastMyMap = spark.sparkContext.broadcast(myMap)
def lookup: UserDefinedFunction = udf((key: String) => {
println("getting the value for the key " + key)
broadcastMyMap.value.get(key)
}
)
val finaldf = mydf.withColumn("col2", when($"col2" === "0000", lookup($"col1")).otherwise($"col2"))
finaldf.show
}
}
Результат :
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+----+----+
|col1|col2|
+----+----+
| 1| a|
| 2|0000|
| 3| c|
| 4|0000|
+----+----+
Map(2 -> b, 4 -> d)
getting the value for the key 2
getting the value for the key 4
+----+----+
|col1|col2|
+----+----+
| 1| a|
| 2| b|
| 3| c|
| 4| d|
+----+----+
примечание: для транслируемой небольшой карты не будет значительного ухудшения качества.
если вы хотите использовать фреймворк данных, вы можете преобразовать карту в фрейм данных
val df = myMap.toSeq.toDF("key", "val")
Map(2 -> b, 4 -> d) in dataframe format will be like
+----+----+
|key|val |
+----+----+
| 2| b|
| 4| d|
+----+----+
а затем присоединяйтесь как это
Сделай сам...
Спасибо за ответ! Отсутствующий .value
был опечаткой, к сожалению, я уже использую этот код, но он все еще не работает.
я вижу отсутствие домашнего задания при публикации вопроса. например, сам синтаксис неправильный
val myMap : Map[String, String] ("2" -> "b", "4" -> "d")
что означает это без=
во-вторых, если вы хотите публиковать подготовленные переменные, подобные этому `val mydf = Seq((1, "a"), (2, "0000" ), (3, "c"), (4, "0000")).toDF("col1", "col2") ` В ПРОТИВНОМ СЛУЧАЕ ответчики должны выполнить двойную работу по подготовке ваших выборочных данных из таблицы. в следующий раз позаботьтесь обо всех этих вещах.