Потребитель Kafka зависает в опросе, когда Kafka не работает

Я экспериментировал с базовой настройкой Zookeeper и Kafka, чтобы узнать, как их использовать, но у меня проблемы с потребителем. Когда Kafka недоступен, вызов метода poll() зависает до тех пор, пока он не вернется в оперативный режим.

Версия Kafka: 0.10.1.0

Мой код выглядит так:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);

while (!stopped) {
    // If by any reason Kafka is not available this call will hang
    // until Kafka is back online.
    records = consumer.poll(timeout);

    for (ConsumerRecord<String, byte[]> record : records) {
        process(record);
    }

    Thread.sleep(sleepTime);
}

Я читал, что когда я звоню на poll(), потребитель будет пытаться подключиться к Kafka бесконечно, пока он не вернется в сеть или пока не будет вызван consumer.wakeup().

Я хочу, чтобы код действовал иначе, когда Kafka не в сети. Есть ли способ ограничить повторные попытки потребителя или сделать его неудачным при опросе от несуществующей кафки?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
0
1 358
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

К сожалению, это все еще проблема. Многие потребительские методы могут зависеть от различных сценариев.

В настоящее время разрабатывается предложение по улучшению Kafka, КИП-266, для добавления тайм-аутов в методы Consumer во избежание зависаний.

Насколько я знаю, вызов wakeup() из другого потока - лучший обходной путь.


Обновлено: Начиная с Kafka 2.0.0, все потребительские вызовы могут принимать тайм-аут. Это позволяет восстановить контроль в случае падения брокеров.

Другие вопросы по теме