Выполнение поиска на широковещательной карте, обусловленной значением столбца в Spark, с использованием Scala

Я хочу выполнить поиск по myMap. Когда значение col2 равно «0000», я хочу обновить его значением, связанным с ключом col1. В противном случае я хочу сохранить существующее значение col2.

val myDF :

+-----+-----+
|col1 |col2 |
+-----+-----+
|1    |a    | 
|2    |0000 |
|3    |c    |
|4    |0000 |
+-----+-----+

val myMap : Map[String, String] ("2" -> "b", "4" -> "d")
val broadcastMyMap = spark.sparkContext.broadcast(myMap)

def lookup = udf((key:String) => broadcastMyMap.value.get(key))

myDF.withColumn("col2", when ($"col2" === "0000", lookup($"col1")).otherwise($"col2"))

Я использовал приведенный выше код в spark-shell, и он отлично работает, но когда я создаю jar-файл приложения и отправляю его в Spark с помощью spark-submit, он выдает ошибку:

org.apache.spark.SparkException: Failed to execute user defined  function(anonfun$5: (string) => string)

Caused by: java.lang.NullPointerException

Есть ли способ выполнить поиск без использования UDF, которые не являются лучшим вариантом с точки зрения производительности, или исправить ошибку? Я думаю, что не могу просто использовать соединение, потому что некоторые значения myDF.col2, которые необходимо сохранить, могут быть заменены в ходе операции.

я вижу отсутствие домашнего задания при публикации вопроса. например, сам синтаксис неправильный val myMap : Map[String, String] ("2" -> "b", "4" -> "d") что означает это без = во-вторых, если вы хотите публиковать подготовленные переменные, подобные этому `val mydf = Seq((1, "a"), (2, "0000" ), (3, "c"), (4, "0000")).toDF("col1", "col2") ` В ПРОТИВНОМ СЛУЧАЕ ответчики должны выполнить двойную работу по подготовке ваших выборочных данных из таблицы. в следующий раз позаботьтесь обо всех этих вещах.

Ram Ghadiyaram 13.06.2019 20:48
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
1
321
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

ваш NullPointerException НЕдействителен. Я доказал это с помощью примера программы, как показано ниже.
ОТЛИЧНО РАБОТАЕТ. вы выполняете приведенную ниже программу.

package com.example

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction


object MapLookupDF {
  Logger.getLogger("org").setLevel(Level.OFF)

  def main(args: Array[String]) {
    import org.apache.spark.sql.functions._

    val spark = SparkSession.builder.
      master("local[*]")
      .appName("MapLookupDF")
      .getOrCreate()
    import spark.implicits._
    val mydf = Seq((1, "a"), (2, "0000"), (3, "c"), (4, "0000")).toDF("col1", "col2")
    mydf.show
    val myMap: Map[String, String] = Map("2" -> "b", "4" -> "d")
    println(myMap.toString)
    val broadcastMyMap = spark.sparkContext.broadcast(myMap)

    def lookup: UserDefinedFunction = udf((key: String) => {
      println("getting the value for the key " + key)
      broadcastMyMap.value.get(key)
    }
    )

    val finaldf = mydf.withColumn("col2", when($"col2" === "0000", lookup($"col1")).otherwise($"col2"))
    finaldf.show
  }
}

Результат :

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   2|0000|
|   3|   c|
|   4|0000|
+----+----+

Map(2 -> b, 4 -> d)
getting the value for the key 2
getting the value for the key 4
+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   2|   b|
|   3|   c|
|   4|   d|
+----+----+

примечание: для транслируемой небольшой карты не будет значительного ухудшения качества.

если вы хотите использовать фреймворк данных, вы можете преобразовать карту в фрейм данных

val df = myMap.toSeq.toDF("key", "val")

Map(2 -> b, 4 -> d) in dataframe format will be like
+----+----+
|key|val  |
+----+----+
|   2|   b|
|   4|   d|
+----+----+

а затем присоединяйтесь как это

Сделай сам...

Спасибо за ответ! Отсутствующий .value был опечаткой, к сожалению, я уже использую этот код, но он все еще не работает.

santo94 12.06.2019 09:20

Другие вопросы по теме