Scala и Spark - логистическая регрессия - NullPointerException

Всем привет!

У меня проблема с моим scala и искровым кодом. Я пытаюсь реализовать модель логистической регрессии. Для этого мне пришлось реализовать две функции UDF для сбора моих функций. Проблема в том, что каждый раз, когда я пытаюсь вызвать функцию dataframe.show (), я получаю сообщение об ошибке:

Scala и Spark - логистическая регрессия - NullPointerException

Я подумал, что, возможно, у меня есть нулевые значения в моем фрейме данных, и я попытался вызвать dataframe.na.drop (), чтобы исключить вероятные нулевые значения.

Проблема по-прежнему существует, и в ней говорится, что не удалось выполнить определенную пользователем функцию (anonfun $ 3: (array, array) => int).

Вот код моей дыры:

val sc = spark.sparkContext

val data = sc.textFile("resources/data/training_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1), fields(2).toInt)
})
import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")

val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")

println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())

println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())

println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")

println(joinedDF.count())

joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")

println("Renameming joinedDF...")
joinedDF = joinedDF
  .withColumnRenamed("srcId","id_from")
  .withColumnRenamed("dstId","id_to")
  .withColumnRenamed("year","year_from")
  .withColumnRenamed("title","title_from")
  .withColumnRenamed("authors","authors_from")
  .withColumnRenamed("jurnal","jurnal_from")
  .withColumnRenamed("abstract","abstract_from")

var infoDfRenamed = joinedDF
  .withColumnRenamed("id_from","id_from")
  .withColumnRenamed("id_to","id_to")
  .withColumnRenamed("year_from","year_to")
  .withColumnRenamed("title_from","title_to")
  .withColumnRenamed("authors_from","authors_to")
  .withColumnRenamed("jurnal_from","jurnal_to")
  .withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")

var finalDF = joinedDF.as(("a")).join(infoDF.as("b"),$"a.id_to" === $"b.srcId")

finalDF = finalDF
  .withColumnRenamed("year","year_to")
  .withColumnRenamed("title","title_to")
  .withColumnRenamed("authors","authors_to")
  .withColumnRenamed("jurnal","jurnal_to")
  .withColumnRenamed("abstract","abstract_to")

println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")


println("Spliting title_from column into words...")
finalDF = finalDF.withColumn("title_from_words", functions.split(col("title_from"), "\\s+"))
println("Spliting title_to column into words...")
finalDF = finalDF.withColumn("title_to_words", functions.split(col("title_to"), "\\s+"))

println("Spliting authors_from column into words...")
finalDF = finalDF.withColumn("authors_from_words", functions.split(col("authors_from"), "\\s+"))
println("Spliting authors_to column into words...")
finalDF = finalDF.withColumn("authors_to_words", functions.split(col("authors_to"), "\\s+"))

println("Removing stopwords from title_from column...")
val remover = new StopWordsRemover().setInputCol("title_from_words").setOutputCol("title_from_words_f")
finalDF = remover.transform(finalDF)

println("Removing stopwords from title_to column...")
val remover2 = new StopWordsRemover().setInputCol("title_to_words").setOutputCol("title_to_words_f")
finalDF = remover2.transform(finalDF)

println("Removing stopwords from authors_from column...")
val remover3 = new StopWordsRemover().setInputCol("authors_from_words").setOutputCol("authors_from_words_f")
finalDF = remover3.transform(finalDF)

println("Removing stopwords from authors_to column...")
val remover4 = new StopWordsRemover().setInputCol("authors_to_words").setOutputCol("authors_to_words_f")
finalDF = remover4.transform(finalDF)

finalDF.count() 

val udf_title_overlap=udf(findNumberCommonWordsTitle(_:Seq[String],_:Seq[String]))
val udf_authors_overlap = udf(findNumberCommonAuthors(_:Seq[String], _:Seq[String]))

println("Getting the number of common words between title_from and title_to columns using UDF function...")
finalDF = finalDF.withColumn("titles_intersection",udf_title_overlap(finalDF("title_from_words"),finalDF("title_to_words")))

println("Getting the number of common words between authors_from and authors_to columns using UDF function...")
finalDF = finalDF.withColumn("authors_intersection",udf_authors_overlap(finalDF("aut 
hors_from_words"),finalDF("authors_to_words")))

finalDF.count()

finalDF = finalDF.withColumn("time_dist",abs($"year_from" - 
$"year_to"))

println("Show schema of finalDF:\n")
finalDF.printSchema()

println("Dropping unused columns from finalDF...\n")
val finalCollsDF = finalDF.select("label","titles_intersection", 
"authors_intersection", "time_dist")

println("Printing schema for finalDF...\n")
finalCollsDF.printSchema()

println("Creating features coll from finalDF using 
VectorAssembler...\n")
val assembler = new VectorAssembler()
  .setInputCols(Array("titles_intersection", "authors_intersection", 
"time_dist"))
  .setOutputCol("features")

val output = assembler.transform(finalCollsDF)

println("Printing final schema before trainning...\n")
output.printSchema()
output.na.drop()

println("Splitting dataset into trainingData and testData...\n")
val Array(trainingData, testData) = output.randomSplit(Array(0.6, 
0.4))

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setRawPredictionCol("prediction_raw")
  .setMaxIter(10)

val lr_model = lr.fit(trainingData)

val lr_results = lr_model.transform(testData)

val evaluator = new BinaryClassificationEvaluator()
  .setRawPredictionCol("prediction_raw")
  .setLabelCol("label")
  .setMetricName("areaUnderPR")

println("RESULTS FOR LOGISTIC REGRESSION:")
println(evaluator.evaluate(lr_results))

}

Я использую две функции UDF:

 def findNumberCommonWordsTitle(title_from:Seq[String], 
   title_to:Seq[String])  = {
   val intersection = title_from.intersect(title_to)
   intersection.length
    }

  def findNumberCommonAuthors(author_from:Seq[String], 
   author_to:Seq[String]) = {
  val intersection = author_from.intersect(author_to)
   intersection.length
  }

Я вручную искал нулевые значения с помощью оператора foreach, но это тоже не сработало. Как я могу исправить эту проблему. Может у меня другая проблема, которую я не могу найти.

Спасибо

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
161
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Spark позволяет значениям быть нулевыми в фреймах данных, поэтому вызов UDF для столбцов этого фрейма данных может привести к NullPointerExceptions, если, например, один или оба массива являются null.

Вы должны проверить свой UDF на наличие значений null и обработать их соответствующим образом или убедиться, что столбец в фрейме данных не может быть нулевым, и установить нулевые значения для пустых массивов.

 def findNumberCommonAuthors(author_from:Seq[String], author_to:Seq[String]) = {
   if (author_from == null || author_to == null) 0
   else author_from.intersect(author_to).length
  }

Вы абсолютно правы! Работает нормально. Большое тебе спасибо!

atheodos 20.12.2018 20:34

Другие вопросы по теме