Я пытаюсь записывать данные в Elasticsearch с помощью Spark. Моя папка в HDFS около 1 ГБ данных, имеет много файлов txt (8000 файлов). Но когда я отправил задание, произошла следующая ошибка:
21/04/05 01:13:38 ERROR TaskSchedulerImpl: Lost executor 2 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:13:53 ERROR TaskSchedulerImpl: Lost executor 0 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:16:09 ERROR TaskSchedulerImpl: Lost executor 3 on mynode2: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:16:09 ERROR TaskSchedulerImpl: Lost executor 4 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:18:54 ERROR TaskSchedulerImpl: Lost executor 5 on mynode1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
21/04/05 01:20:00 ERROR TaskSetManager: Task 1 in stage 1.2 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.2 failed 4 times, most recent failure: Lost task 1.3 in stage 1.2 (TID 28, 192.168.5.106, executor 6): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 110581. To avoid this, increase spark.kryoserializer.buffer.max value
Мой код:
import org.apache.spark.sql._
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.{ SparkConf, SparkContext }
object index_to_es {
def main(args: Array[String]): Unit = {
println("Start app ...")
// Create spark session
val spark = SparkSession.builder()
.master("spark://master:7077")
.appName("Spark - Index to ES")
.config("spark.es.nodes","node1")
.config("spark.es.port","9200")
.config("es.batch.size.entries", "1000")
// .config("spark.es.nodes.wan.only","true") //Needed for ES on AWS
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
// Write dataframe to ES
println("Staring indexing to ES ...")
val startTimeMillis = System.currentTimeMillis()
// Create spark context
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
val rddFromFile = spark.sparkContext.wholeTextFiles("hdfs://master:9000/bigdata/bigger").repartition(2)
var listDoc = List[IndexDocument]()
rddFromFile.collect().foreach(f=>{
val indexDoc = IndexDocument(f._1, f._2)
listDoc = listDoc :+ indexDoc
// val rdd = sc.makeRDD(Seq(indexDoc))
// EsSpark.saveToEs(rdd, "bigger")
})
val rdd = sc.makeRDD(listDoc)
EsSpark.saveToEs(rdd, "bigdata")
val endTimeMillis = System.currentTimeMillis()
val durationSeconds = (endTimeMillis - startTimeMillis) / 1000
println("Indexing successfully! Time indexing: " + durationSeconds + " seconds")
}
}
case class IndexDocument (filePath:String, content:String)
В моем искровом кластере 3 узла, 2 ГБ оперативной памяти на каждый узел. И mynode1 запускает ElasticSearch, 3 узла запускает hdfs
искра-default.conf
spark.master spark://master:7077
spark.eventLog.enabled false
spark.eventLog.dir hdfs://namenode:8021/directory
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 6g
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers = "one two three"
#spark.storage.memoryFraction 0.2
spark.executor.memory 800m
spark.kryoserializer.buffer.max 256m
Я поставил 100 МБ данных в порядке, но 1 ГБ - нет. Должен ли я уменьшить свои данные?