Я использую scala, чтобы просто написать фрейм данных в дельта-формате в hdfs, но получаю ошибку, которую я не могу понять, что ее вызывает, пожалуйста, помогите мне с этим
Ниже приведен код, с помощью которого я пишу дельта-таблицу в своих локальных hdfs.
val columns=Array("id", "first", "last", "year")
val test_df =sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*);
test_df.write.format("delta").mode("overwrite").save("hdfs://localhost:9000/user/test/");
Используемые зависимости:
Сообщение об ошибке:
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:197)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:210)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:156)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at EventStream$.update_cta(EventStream.scala:25)
at EventStream$.$anonfun$update_profiles_on_s3$1(EventStream.scala:68)
at EventStream$.$anonfun$update_profiles_on_s3$1$adapted(EventStream.scala:68)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormatWriter$Empty2Null
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 61 more
Спарк-версия
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ / _ / _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.4.0
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)
Чтобы воспроизвести эту проблему, просто запустите
./spark-shell --packages io.delta:delta-core_2.12:1.2.1
val columns=Array("id", "first", "last", "year")
val test_df=sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*)
test_df.write.format("delta").mode("append").save("hdfs://localhost:9000/user/test/")
OR give any path if hdfs is not configured
Не могу воспроизвести NoClassDefFoundError
. Удалось воспроизвести после замены hdfs://...
на file:///...
. Следующий код, кажется, работает со следующим build.sbt
(код выдает java.net.ConnectException: Call From .../127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException; For more details see:
http://wiki.apache.org/hadoop/ConnectionRefused для меня, потому что я не настроил службу Hadoop). Пожалуйста, подготовьте пошаговое воспроизведение.
build.sbt
ThisBuild / scalaVersion := "2.12.17"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "3.4.0",
"io.delta" %% "delta-core" % "1.2.1",
"org.apache.hadoop" % "hadoop-client" % "3.3.5",
"org.apache.hadoop" % "hadoop-client-api" % "3.3.5",
"org.apache.hadoop" % "hadoop-client-runtime" % "3.3.5"
)
import org.apache.spark.sql.SparkSession
object App {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("Spark app")
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val columns = Array("id", "first", "last", "year")
val test_df = sc.parallelize(Seq(
(1, "John", "Doe", 1986),
(2, "Ive", "Fish", 1990),
(4, "John", "Wayne", 1995)
)).toDF(columns: _*);
test_df.write.format("delta")
.mode("overwrite")
.save("hdfs://localhost:9000/user/test/")
}
}
Класс org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
существует в spark-sql
3.3.2-
В 3.4.0 его нет
Вы должны посмотреть на свой путь к классам, чтобы выяснить, какая из зависимостей все еще использует Spark 3.3.2, хотя вы думаете, что используете Spark 3.4.0.
Как вы строите свой проект? С сбт?
Вы можете сделать sbt dependencyTree
(это не показывает provided
зависимости).
Или напечатайте System.getProperty("java.class.path")
(это показывает путь к классам только при запуске JVM).
Или добавьте scalacOptions += "-Ylog-classpath"
к build.sbt
Или запустите следующий скрипт в вашей фактической среде Spark, которую вы используете.
var cl = getClass.getClassLoader
while (cl != null) {
println(s"classloader: ${cl.getClass.getName}")
cl match {
case cl: URLClassLoader =>
println("classloader urls:")
cl.getURLs.foreach(println)
case _ =>
println("not URLClassLoader")
}
cl = cl.getParent
}
Запустите банку кода scala, появится NoSuchMethodError:scala.Predef$.refArrayOps
Почему я получаю ошибку NoClassDefFoundError в Java?
Какие причины и в чем разница между NoClassDefFoundError и ClassNotFoundException?
Дело похоже в том, что delta-core
1.2.1 компилировалась относительно spark-sql
3.2.0
https://repo1.maven.org/maven2/io/delta/delta-core_2.13/1.2.1/delta-core_2.13-1.2.1.pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
И даже самая новая delta-core
2.3.0 компилируется относительно spark-sql
3.3.2
https://repo1.maven.org/maven2/io/delta/delta-core_2.13/2.3.0/delta-core_2.13-2.3.0.pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.2</version>
<scope>provided</scope>
</dependency>
Но, как я уже сказал, Empty2Null
отсутствует в spark-sql
3.4.0. Таким образом, кажется, что все существующие версии delta-core
могут быть несовместимы со Spark 3.4.0.
Если в build.sbt
я обновлю delta-core
до 2.3.0, то NoClassDefFoundError
изменится на
com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.ErrorInfo.messageFormat()Ljava/lang/String;
Попробуйте понизить Spark до 3.3.2-
Тогда ваш код у меня работает правильно. Я заменил "hdfs://..."
на локальный путь "file:///..."
и добавил конфигурацию
val spark = SparkSession.builder
.master("local")
.appName("Spark app")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") // !!!
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") // !!!
.getOrCreate()
как было запрошено в другом исключении. И теперь в моем локальном пути file:///...
появилось несколько файлов.
На самом деле это написано в документах Delta Lake.
https://docs.delta.io/latest/releases.html
Совместимость с Apache Spark
https://github.com/delta-io/delta/issues/1696 [Запрос функции] Поддержка Spark 3.4
планируется выпустить Delta 2.4 на Spark 3.4
Я добавил несколько шагов, чтобы просто воспроизвести это в оболочке, проблема здесь, похоже, связана с форматом «дельта», независимо от того, записываете ли вы его на hdfs или где-то еще, проблема остается прежней.
@YashTandon Используете ли вы spark-shell
from spark-3.4.0-bin-hadoop3.tgz
from spark.apache.org/downloads.html? Для меня последняя строка test_df.write...
в искровом снаряде выдает java.net.ConnectException
, потому что я этого не настроил (не NoClassDefFoundError
). Можете ли вы показать свой путь к классам? Например, вы можете запустить этот мой скрипт загрузчика классов или запустить spark-shell
как ./spark-shell -Ylog-classpath --packages ...
@YashTandon Также вы можете ввести System.getProperty("java.class.path")
, но это может показать не полный путь к классам stackoverflow.com/questions/18626396/… По умолчанию Scala repl (оболочка Spark) усекает вывод, поэтому это следует настроить с помощью :power
vals.isettings.maxPrintString = Int.MaxValue
блога .ssanj.net/posts/…
Да, я использую spark-shell из пакета tgz, также в настоящее время я не могу редактировать свой вопрос, так как в стеке слишком много ожидающих изменений, поэтому он не позволяет мне редактировать, вот ссылка на журнал пути к классам, который был напечатан docs.google.com/document/d/…
@YashTandon На самом деле мне удалось воспроизвести NoClassDefFoundError
. Я заменил hdfs на локальный путь file:///...
. Спасибо.
@YashTandon NoClassDefFoundError
также воспроизводится с App.scala
и build.sbt
, как в начале моего ответа, если я заменю "hdfs://..."
на "file:///..."
.
@YashTandon Думаю, я понял. Смотрите обновление.
да, я понизил версию до Spark v3.2.1, и теперь она заработала, спасибо за вашу помощь.
Вам действительно нужна HDFS? Можете указать путь к файлу://? Как вы запускаете этот код? Использование искровой отправки? И создать банку сборки? spark.apache.org/docs/latest/…