У меня есть два искровых dfs, я хочу сделать итератор foreach для одного df и получить определенные записи, связанные с someId, из следующего df.
когда я делаю это каждый раз, когда это происходит java.lang.NullPointerException,
Я разместил свой код с комментариями внутри цикла foreach. Я пробовал 3 способа сделать это, но каждый раз возникала одна и та же ошибка.
Пожалуйста, помогите мне решить эту проблему.
val schListDf = spark.read.format("csv")
.option("header", "true")
.load("/home/user/projects/scheduled.csv")
schListDf.createOrReplaceTempView («запланировано»)
val trsListDf = spark.read.format("csv")
.option("header", "true")
.load("/home/user/projects/transaction.csv")
trsListDf.createOrReplaceTempView («транзакция»)
//THIS WORK FINE
val df3 = spark.sql ("выберите * из лимита транзакции 5"). show ()
schListDf.foreach(row => {
if (row(2) != null){
// I HAVE TRIED THIS WAY FIRST, BUT OCCURRED SAME ERROR
val df = spark.sql("select * from transaction where someid = '"+row(2)+"'")
// I HAVE TRIED THIS WAY SECOND(WITHOUT someID filter), BUT OCCURRED SAME ERROR
val df2 = spark.sql("select * from transaction limit 5")
// I HAVE TRIED THIS WAY ALSO(FILTER WITH DF), BUT OCCURRED SAME ERROR
val filteredDataListDf = trsListDf.filter($"someid" === row(2))
}
})
18/12/02 10:36:34 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 7) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 5)
java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 ERROR Executor: Exception in task 2.0 in stage 4.0 (TID 6) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/12/02 10:36:34 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 6, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:142) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:140) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:52) at controllers.FileProcess$$anonfun$hnbFile$1.apply(FileProcess.scala:48) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)




Некоторые аспекты Spark связаны с драйверами.
Доступ к DF невозможен изнутри foreach, что подразумевает сторону исполнителя.
Это парадигма. То же самое относится к RDD и Spark Session.
Другими словами, foreach подходит, но не с val DF или spark.sql. Например, вам понадобится цикл while.
Это распространенное заблуждение, когда кто-то начинает со Spark.