Spark java.lang.NullPointerException Ошибка при фильтрации кадра данных искры внутри итератора foreach

У меня есть два искровых dfs, я хочу сделать итератор foreach для одного df и получить определенные записи, связанные с someId, из следующего df.

когда я делаю это каждый раз, когда это происходит java.lang.NullPointerException,

Я разместил свой код с комментариями внутри цикла foreach. Я пробовал 3 способа сделать это, но каждый раз возникала одна и та же ошибка.

Пожалуйста, помогите мне решить эту проблему.

val schListDf = spark.read.format("csv")
.option("header", "true")
.load("/home/user/projects/scheduled.csv")

schListDf.createOrReplaceTempView («запланировано»)

 val trsListDf = spark.read.format("csv")
.option("header", "true")
.load("/home/user/projects/transaction.csv")

trsListDf.createOrReplaceTempView («транзакция»)

//THIS WORK FINE

val df3 = spark.sql ("выберите * из лимита транзакции 5"). show ()

schListDf.foreach(row => {
if (row(2) != null){

  // I HAVE TRIED THIS WAY FIRST, BUT OCCURRED SAME ERROR
  val df = spark.sql("select * from transaction where  someid = '"+row(2)+"'")

  // I HAVE TRIED THIS WAY SECOND(WITHOUT someID filter), BUT OCCURRED SAME ERROR
  val df2 = spark.sql("select * from transaction limit 5")

  // I HAVE TRIED THIS WAY ALSO(FILTER WITH DF), BUT OCCURRED SAME ERROR
  val filteredDataListDf = trsListDf.filter($"someid" === row(2))
}

})

18/12/02 10:36:34 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 7) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 5)

java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 ERROR Executor: Exception in task 2.0 in stage 4.0 (TID 6) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 6, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
1 254
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Некоторые аспекты Spark связаны с драйверами.

Доступ к DF невозможен изнутри foreach, что подразумевает сторону исполнителя.

Это парадигма. То же самое относится к RDD и Spark Session.

Другими словами, foreach подходит, но не с val DF или spark.sql. Например, вам понадобится цикл while.

Это распространенное заблуждение, когда кто-то начинает со Spark.

Другие вопросы по теме