Невозможно использовать свойство max.poll.interval.ms в потоковой передаче искр с помощью kafka

Я читаю около 1 млн записей на пакет из Kafka с использованием потоковой передачи искр (подход Direct Stream) и делаю некоторый анализ данных, который занимает около 13-15 минут для их обработки.

Итак, чтобы стабилизировать систему, я решил изменить свойство kafka 'max.poll.interval.ms' в параметрах kafka, чтобы опрос мог произойти через 15 минут.

var kafkaParams = Map(
     ..
     ..
     "auto.offset.reset" -> "latest",
     "enable.auto.commit" -> (false: java.lang.Boolean),
     "max.poll.interval.ms" -> (900000: java.lang.Integer)
)

Но когда я проверил логи, там написано:

WARN ConsumerConfig: конфигурация max.poll.interval.ms = 900000 была предоставлена, но не является известной конфигурацией.

Это что-то связано с версией kafka, что я не могу использовать это свойство. Я использую версию Kafka (0.10.1.0).

Любая помощь будет оценена.

Спасибо!

2
0
1 183
2

Ответы 2

Он не является частью Потребительской собственности. Вы можете попробовать spark.streaming.kafka.consumer.poll.ms в Spark conf.

спасибо за предложение, попробую. Однако свойство "max.poll.interval.ms" является частью конфигурации пользователя, как указано в документации kafka kafka.apache.org/0101/documentation.html.

Ashutosh Sharma 30.10.2018 09:53

Tl; доктор

Просто добавьте артефакт в свой pom.xml, и это поможет.

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.0</version>
</dependency>

Полный ответ

Параметр max.poll.interval.ms был добавлен с Kafka 0.10.1 (КИП-62).

Начиная с Kafka 0.10.1, биения отправляются отдельным потоком, что значительно сокращает ненужную перебалансировку. Настоятельно рекомендуется использовать клиент kafka с версии 0.10.1.

Согласно Sathvik Vutukuri's Spark 3.0.0 Пример, опубликованному в DZone, явное добавление определенной версии клиентского артефакта kafka (в списке зависимостей, например, pom.xml / build.sbt) заставит экземпляр создать экземпляр надлежащей версии клиента kafka.

Более

Со временем кафка развивается. Всегда рекомендуется использовать согласованную / совместимую клиентскую версию развернутого / исходного / целевого кластера kafka.

Ознакомьтесь с полным списком артефактов по адресу:

https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

Другие вопросы по теме