Динамическое выделение ресурсов для искровых приложений не работает

Я новичок в Spark и пытаюсь понять, как работает динамическое распределение ресурсов. У меня есть искровое структурированное потоковое приложение, которое пытается читать миллионы записей за раз из Кафки и обрабатывать их. Мое приложение всегда начинается с 3 исполнителей и никогда не увеличивает количество исполнителей.

Обработка занимает 5-10 минут. Я думал, что это увеличит количество исполнителей (до 10) и попытается закончить обработку раньше, чего не происходит. Что мне здесь не хватает? Как это должно работать?

Я установил ниже свойства в Ambari для Spark.

spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.initialExecutors = 3
spark.dynamicAllocation.maxExecutors = 10
spark.dynamicAllocation.minExecutors = 3
spark.shuffle.service.enabled = true

Ниже показано, как выглядит моя команда отправки

/usr/hdp/3.0.1.0-187/spark2/bin/spark-submit --class com.sb.spark.sparkTest.sparkTest --master yarn --deploy-mode cluster --queue default sparkTest-assembly-0.1.jar

Искровой код

//read stream
val dsrReadStream = spark.readStream.format("kafka")
   .option("kafka.bootstrap.servers", brokers) //kafka bokers
   .option("startingOffsets", startingOffsets) // start point to read
   .option("maxOffsetsPerTrigger", maxoffsetpertrigger) // no. of records per batch
   .option("failOnDataLoss", "true")

 /****
 Logic to validate format of loglines. Writing invalid log lines to kafka and store valid log lines in 'dsresult'

 ****/

//write stream
val dswWriteStream =dsresult.writeStream
    .outputMode(outputMode) // file write mode, default append
    .format(writeformat) // file format ,default orc
    .option("path",outPath) //hdfs file write path
    .option("checkpointLocation", checkpointdir) location
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime(triggerTimeInMins))

Сколько у вас разделов Kafka?

user10938362 08.04.2019 12:39

3 раздела кафки

hampi2017 08.04.2019 15:01

Таким образом, у вашего приложения нет причин запрашивать больше ресурсов. Три раздела Kafka -> 3 раздела Spark -> 3 исполнителя, даже на одном ядре каждого достаточно для достижения максимального параллелизма.

user10938362 08.04.2019 15:21

В настоящее время для обработки миллиона записей с 3 исполнителями требуется около 5-7 минут. Я протестировал то же самое с 6 исполнителями, и это заняло сравнительно меньше времени. Если я установлю максимальное количество исполнителей равным 10, не следует ли динамически использовать больше исполнителей (более 3, если доступно) для увеличения времени обработки? Мое идеальное ожидаемое время обработки составляет менее 2 минут. Должен ли я увеличить первоначальных исполнителей для достижения этого?Пожалуйста, сообщите.

hampi2017 08.04.2019 15:38
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
4
633
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Динамическое распределение ресурсов не работает с Spark Streaming

Обратитесь по этой ссылке

spark.streaming.dynamicAllocation.enabled=true ?

Beyhan Gul 15.05.2019 00:39

Просто чтобы уточнить,

spark.streaming.dynamicAllocation.enabled=true

работал только для Dstreams API. См. Джира

Кроме того, если вы установите

spark.dynamicAllocation.enabled=true

и запустить задание структурированной потоковой передачи, срабатывает алгоритм пакетного динамического распределения, который может быть не очень оптимальным. См. Джира

Другие вопросы по теме

Включить метрику Spark в LucidWorks Fusion
Как разобрать JSON, содержащий строковое свойство, представляющее JSON
Фильтрация кадров данных, обусловленных несколькими столбцами, с различными условиями в зависимости от значений столбца
Динамически зацикливать набор данных для всех имен столбцов
Как получить данные второго фрейма данных для всех значений определенных значений столбцов, совпадающих в первом фрейме данных?
Как сравнить две таблицы и заменить нули значениями из другой таблицы
Как построить график из фрейма данных? (ГрафикX)
Конфигурация для задания искры для записи файла 3000000 в качестве вывода
Если мы создадим несколько сеансов Spark с помощью метода newSession(), как будет распределяться память драйвера между несколькими сеансами Spark?
Как мне написать автономное приложение в Spark, чтобы найти 20 самых упоминаний в текстовом файле, заполненном извлеченными твитами

Похожие вопросы

Pyspark + анализ ассоциативных правил: как передать фрейм данных в формат, подходящий для частого анализа шаблонов?
Преобразование векторного столбца в столбец Double[Array] в scala Spark
Org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 98 на этапе 11.0 не удалась 4 раза
Данные, считанные из кафки в искру, исчезают после регистрации в виде таблицы?
Как обеспечить, чтобы все данные, принадлежащие пользователю, попадали в один и тот же файл при использовании spark?
Как можно преобразовать линейный список PySpark RDD в DataFrame?
Как добавить совершенно нерелевантный столбец во фрейм данных при использовании pyspark, spark + databricks
Как отправить slurm job, используя много воркеров, а не просто работая в локальном режиме?
Создать уникальный идентификатор для комбинации пары значений из двух столбцов в фрейме данных искры
Как использовать aggregateBykey для получения списка значений для каждого ключа?