Я новичок в Spark и пытаюсь понять, как работает динамическое распределение ресурсов. У меня есть искровое структурированное потоковое приложение, которое пытается читать миллионы записей за раз из Кафки и обрабатывать их. Мое приложение всегда начинается с 3 исполнителей и никогда не увеличивает количество исполнителей.
Обработка занимает 5-10 минут. Я думал, что это увеличит количество исполнителей (до 10) и попытается закончить обработку раньше, чего не происходит. Что мне здесь не хватает? Как это должно работать?
Я установил ниже свойства в Ambari для Spark.
spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.initialExecutors = 3
spark.dynamicAllocation.maxExecutors = 10
spark.dynamicAllocation.minExecutors = 3
spark.shuffle.service.enabled = true
Ниже показано, как выглядит моя команда отправки
/usr/hdp/3.0.1.0-187/spark2/bin/spark-submit --class com.sb.spark.sparkTest.sparkTest --master yarn --deploy-mode cluster --queue default sparkTest-assembly-0.1.jar
Искровой код
//read stream
val dsrReadStream = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", brokers) //kafka bokers
.option("startingOffsets", startingOffsets) // start point to read
.option("maxOffsetsPerTrigger", maxoffsetpertrigger) // no. of records per batch
.option("failOnDataLoss", "true")
/****
Logic to validate format of loglines. Writing invalid log lines to kafka and store valid log lines in 'dsresult'
****/
//write stream
val dswWriteStream =dsresult.writeStream
.outputMode(outputMode) // file write mode, default append
.format(writeformat) // file format ,default orc
.option("path",outPath) //hdfs file write path
.option("checkpointLocation", checkpointdir) location
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime(triggerTimeInMins))
3 раздела кафки
Таким образом, у вашего приложения нет причин запрашивать больше ресурсов. Три раздела Kafka -> 3 раздела Spark -> 3 исполнителя, даже на одном ядре каждого достаточно для достижения максимального параллелизма.
В настоящее время для обработки миллиона записей с 3 исполнителями требуется около 5-7 минут. Я протестировал то же самое с 6 исполнителями, и это заняло сравнительно меньше времени. Если я установлю максимальное количество исполнителей равным 10, не следует ли динамически использовать больше исполнителей (более 3, если доступно) для увеличения времени обработки? Мое идеальное ожидаемое время обработки составляет менее 2 минут. Должен ли я увеличить первоначальных исполнителей для достижения этого?Пожалуйста, сообщите.
Динамическое распределение ресурсов не работает с Spark Streaming
spark.streaming.dynamicAllocation.enabled=true ?
Просто чтобы уточнить,
spark.streaming.dynamicAllocation.enabled=true
работал только для Dstreams API. См. Джира
Кроме того, если вы установите
spark.dynamicAllocation.enabled=true
и запустить задание структурированной потоковой передачи, срабатывает алгоритм пакетного динамического распределения, который может быть не очень оптимальным. См. Джира
Сколько у вас разделов Kafka?