Ошибка сериализации Kryo: переполнение буфера. В наличии: 0, необязательно: 110581

Я пытаюсь записывать данные в Elasticsearch с помощью Spark. Моя папка в HDFS около 1 ГБ данных, имеет много файлов txt (8000 файлов). Но когда я отправил задание, произошла следующая ошибка:

21/04/05 01:13:38 ERROR TaskSchedulerImpl: Lost executor 2 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:13:53 ERROR TaskSchedulerImpl: Lost executor 0 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:16:09 ERROR TaskSchedulerImpl: Lost executor 3 on mynode2: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:16:09 ERROR TaskSchedulerImpl: Lost executor 4 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:18:54 ERROR TaskSchedulerImpl: Lost executor 5 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:20:00 ERROR TaskSetManager: Task 1 in stage 1.2 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.2 failed 4 times, most recent failure: Lost task 1.3 in stage 1.2 (TID 28, 192.168.5.106, executor 6): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 110581. To avoid this, increase spark.kryoserializer.buffer.max value

Мой код:

import org.apache.spark.sql._
    import org.elasticsearch.spark.rdd.EsSpark
    import org.apache.spark.{ SparkConf, SparkContext }
    
    object index_to_es {
      def main(args: Array[String]): Unit = {
        println("Start app ...")
    
        // Create spark session
        val spark = SparkSession.builder()
                      .master("spark://master:7077")
                      .appName("Spark - Index to ES")
                      .config("spark.es.nodes","node1")
                      .config("spark.es.port","9200")
                      .config("es.batch.size.entries", "1000")
    
    //                  .config("spark.es.nodes.wan.only","true") //Needed for ES on AWS
                      .getOrCreate()
    
        spark.sparkContext.setLogLevel("ERROR")
    
        // Write dataframe to ES
        println("Staring indexing to ES ...")
    
        val startTimeMillis = System.currentTimeMillis()
    
        // Create spark context
        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        val rddFromFile = spark.sparkContext.wholeTextFiles("hdfs://master:9000/bigdata/bigger").repartition(2)
        var listDoc = List[IndexDocument]()
        rddFromFile.collect().foreach(f=>{
          val indexDoc = IndexDocument(f._1, f._2)
          listDoc = listDoc :+ indexDoc
    
    //      val rdd = sc.makeRDD(Seq(indexDoc))
    //      EsSpark.saveToEs(rdd, "bigger")
        })
        val rdd = sc.makeRDD(listDoc)
        EsSpark.saveToEs(rdd, "bigdata")
    
        val endTimeMillis = System.currentTimeMillis()
        val durationSeconds = (endTimeMillis - startTimeMillis) / 1000
    
        println("Indexing successfully! Time indexing: " + durationSeconds + " seconds")
      }
    
    }
    
    case class IndexDocument (filePath:String, content:String)

В моем искровом кластере 3 узла, 2 ГБ оперативной памяти на каждый узел. И mynode1 запускает ElasticSearch, 3 узла запускает hdfs

искра-default.conf

spark.master                     spark://master:7077
spark.eventLog.enabled           false
spark.eventLog.dir               hdfs://namenode:8021/directory
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              6g
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers = "one two three"
#spark.storage.memoryFraction     0.2
spark.executor.memory            800m
spark.kryoserializer.buffer.max  256m

Я поставил 100 МБ данных в порядке, но 1 ГБ - нет. Должен ли я уменьшить свои данные?

Truong-Giang Nguyen 04.04.2021 22:07
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
1
28
0

Другие вопросы по теме