Невозможно записать DF в дельта-формате на hdfs

Я использую scala, чтобы просто написать фрейм данных в дельта-формате в hdfs, но получаю ошибку, которую я не могу понять, что ее вызывает, пожалуйста, помогите мне с этим

Ниже приведен код, с помощью которого я пишу дельта-таблицу в своих локальных hdfs.

val columns=Array("id", "first", "last", "year")

val test_df =sc.parallelize(Seq(
              (1, "John", "Doe", 1986),
              (2, "Ive", "Fish", 1990),
              (4, "John", "Wayne", 1995)
            )).toDF(columns: _*);

test_df.write.format("delta").mode("overwrite").save("hdfs://localhost:9000/user/test/");

Используемые зависимости:

  1. Искра v3.4.0
  2. Хадуп v3.3.5
  3. Дельта-пакет: io.delta:delta-core_2.12:1.2.1

Сообщение об ошибке:

java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
    at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:197)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:156)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at EventStream$.update_cta(EventStream.scala:25)
    at EventStream$.$anonfun$update_profiles_on_s3$1(EventStream.scala:68)
    at EventStream$.$anonfun$update_profiles_on_s3$1$adapted(EventStream.scala:68)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:726)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormatWriter$Empty2Null
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 61 more

Спарк-версия

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)

Чтобы воспроизвести эту проблему, просто запустите


 ./spark-shell --packages io.delta:delta-core_2.12:1.2.1
    
 val columns=Array("id", "first", "last", "year")
    
 val test_df=sc.parallelize(Seq(
      (1, "John", "Doe", 1986),
      (2, "Ive", "Fish", 1990),
      (4, "John", "Wayne", 1995)
    )).toDF(columns: _*)

  test_df.write.format("delta").mode("append").save("hdfs://localhost:9000/user/test/") 

OR give any path if hdfs is not configured

Вам действительно нужна HDFS? Можете указать путь к файлу://? Как вы запускаете этот код? Использование искровой отправки? И создать банку сборки? spark.apache.org/docs/latest/…

OneCricketeer 21.04.2023 14:30
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
1
235
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Не могу воспроизвести NoClassDefFoundError. Удалось воспроизвести после замены hdfs://... на file:///.... Следующий код, кажется, работает со следующим build.sbt (код выдает java.net.ConnectException: Call From .../127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException; For more details see:http://wiki.apache.org/hadoop/ConnectionRefused для меня, потому что я не настроил службу Hadoop). Пожалуйста, подготовьте пошаговое воспроизведение.

build.sbt

ThisBuild / scalaVersion := "2.12.17"

libraryDependencies ++= Seq(
  "org.apache.spark"  %% "spark-sql"             % "3.4.0",
  "io.delta"          %% "delta-core"            % "1.2.1",
  "org.apache.hadoop" %  "hadoop-client"         % "3.3.5",
  "org.apache.hadoop" %  "hadoop-client-api"     % "3.3.5",
  "org.apache.hadoop" %  "hadoop-client-runtime" % "3.3.5"
)
import org.apache.spark.sql.SparkSession

object App {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .master("local")
      .appName("Spark app")
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    val columns = Array("id", "first", "last", "year")

    val test_df = sc.parallelize(Seq(
      (1, "John", "Doe", 1986),
      (2, "Ive", "Fish", 1990),
      (4, "John", "Wayne", 1995)
    )).toDF(columns: _*);

    test_df.write.format("delta")
      .mode("overwrite")
      .save("hdfs://localhost:9000/user/test/")
  }
}

Класс org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null существует в spark-sql 3.3.2-

https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L59

В 3.4.0 его нет

https://github.com/apache/spark/blob/v3.4.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Вы должны посмотреть на свой путь к классам, чтобы выяснить, какая из зависимостей все еще использует Spark 3.3.2, хотя вы думаете, что используете Spark 3.4.0.

Как вы строите свой проект? С сбт?

  • Вы можете сделать sbt dependencyTree (это не показывает provided зависимости).

  • Или напечатайте System.getProperty("java.class.path") (это показывает путь к классам только при запуске JVM).

  • Или добавьте scalacOptions += "-Ylog-classpath" к build.sbt

  • Или запустите следующий скрипт в вашей фактической среде Spark, которую вы используете.

var cl = getClass.getClassLoader
while (cl != null) {
  println(s"classloader: ${cl.getClass.getName}")
  cl match {
    case cl: URLClassLoader =>
      println("classloader urls:")
      cl.getURLs.foreach(println)
    case _ =>
      println("not URLClassLoader")
  }
  cl = cl.getParent
}

Запустите банку кода scala, появится NoSuchMethodError:scala.Predef$.refArrayOps

Почему я получаю ошибку NoClassDefFoundError в Java?

Какие причины и в чем разница между NoClassDefFoundError и ClassNotFoundException?


Дело похоже в том, что delta-core 1.2.1 компилировалась относительно spark-sql 3.2.0

https://repo1.maven.org/maven2/io/delta/delta-core_2.13/1.2.1/delta-core_2.13-1.2.1.pom

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.13</artifactId>
  <version>3.2.0</version>
  <scope>provided</scope>
</dependency>

И даже самая новая delta-core 2.3.0 компилируется относительно spark-sql 3.3.2

https://repo1.maven.org/maven2/io/delta/delta-core_2.13/2.3.0/delta-core_2.13-2.3.0.pom

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.13</artifactId>
  <version>3.3.2</version>
  <scope>provided</scope>
</dependency>

Но, как я уже сказал, Empty2Null отсутствует в spark-sql 3.4.0. Таким образом, кажется, что все существующие версии delta-core могут быть несовместимы со Spark 3.4.0.

Если в build.sbt я обновлю delta-core до 2.3.0, то NoClassDefFoundError изменится на

com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.ErrorInfo.messageFormat()Ljava/lang/String;

Попробуйте понизить Spark до 3.3.2-

Тогда ваш код у меня работает правильно. Я заменил "hdfs://..." на локальный путь "file:///..." и добавил конфигурацию

val spark = SparkSession.builder
  .master("local")
  .appName("Spark app")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") // !!!
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") // !!!
  .getOrCreate()

как было запрошено в другом исключении. И теперь в моем локальном пути file:///... появилось несколько файлов.

На самом деле это написано в документах Delta Lake.

https://docs.delta.io/latest/releases.html

Совместимость с Apache Spark

Версия Дельта-Лейк Версия Apache Spark 2.3.х 3.3.х 2.2.х 3.3.х 2.1.х 3.3.х 2.0.х 3.2.х 1.2.х 3.2.х 1.1.х 3.2.х 1.0.х 3.1.х 0.7.х и 0.8.х 3.0.х Ниже 0.7.0 2.4.2 - 2.4.

https://github.com/delta-io/delta/issues/1696 [Запрос функции] Поддержка Spark 3.4

планируется выпустить Delta 2.4 на Spark 3.4

Я добавил несколько шагов, чтобы просто воспроизвести это в оболочке, проблема здесь, похоже, связана с форматом «дельта», независимо от того, записываете ли вы его на hdfs или где-то еще, проблема остается прежней.

Yash Tandon 24.04.2023 05:36

@YashTandon Используете ли вы spark-shell from spark-3.4.0-bin-hadoop3.tgz from spark.apache.org/downloads.html? Для меня последняя строка test_df.write... в искровом снаряде выдает java.net.ConnectException, потому что я этого не настроил (не NoClassDefFoundError). Можете ли вы показать свой путь к классам? Например, вы можете запустить этот мой скрипт загрузчика классов или запустить spark-shell как ./spark-shell -Ylog-classpath --packages ...

Dmytro Mitin 24.04.2023 06:09

@YashTandon Также вы можете ввести System.getProperty("java.class.path"), но это может показать не полный путь к классам stackoverflow.com/questions/18626396/… По умолчанию Scala repl (оболочка Spark) усекает вывод, поэтому это следует настроить с помощью :powervals.isettings.maxPrintString = Int.MaxValue блога .ssanj.net/posts/…

Dmytro Mitin 24.04.2023 06:22

Да, я использую spark-shell из пакета tgz, также в настоящее время я не могу редактировать свой вопрос, так как в стеке слишком много ожидающих изменений, поэтому он не позволяет мне редактировать, вот ссылка на журнал пути к классам, который был напечатан docs.google.com/document/d/…

Yash Tandon 24.04.2023 06:27

@YashTandon На самом деле мне удалось воспроизвести NoClassDefFoundError. Я заменил hdfs на локальный путь file:///.... Спасибо.

Dmytro Mitin 24.04.2023 06:28

@YashTandon NoClassDefFoundError также воспроизводится с App.scala и build.sbt, как в начале моего ответа, если я заменю "hdfs://..." на "file:///...".

Dmytro Mitin 24.04.2023 06:33

@YashTandon Думаю, я понял. Смотрите обновление.

Dmytro Mitin 24.04.2023 07:07

да, я понизил версию до Spark v3.2.1, и теперь она заработала, спасибо за вашу помощь.

Yash Tandon 24.04.2023 07:24

Другие вопросы по теме