Я экспериментировал с базовой настройкой Zookeeper и Kafka, чтобы узнать, как их использовать, но у меня проблемы с потребителем. Когда Kafka недоступен, вызов метода poll() зависает до тех пор, пока он не вернется в оперативный режим.
Версия Kafka: 0.10.1.0
Мой код выглядит так:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
while (!stopped) {
// If by any reason Kafka is not available this call will hang
// until Kafka is back online.
records = consumer.poll(timeout);
for (ConsumerRecord<String, byte[]> record : records) {
process(record);
}
Thread.sleep(sleepTime);
}
Я читал, что когда я звоню на poll(), потребитель будет пытаться подключиться к Kafka бесконечно, пока он не вернется в сеть или пока не будет вызван consumer.wakeup().
Я хочу, чтобы код действовал иначе, когда Kafka не в сети. Есть ли способ ограничить повторные попытки потребителя или сделать его неудачным при опросе от несуществующей кафки?




К сожалению, это все еще проблема. Многие потребительские методы могут зависеть от различных сценариев.
В настоящее время разрабатывается предложение по улучшению Kafka, КИП-266, для добавления тайм-аутов в методы Consumer во избежание зависаний.
Насколько я знаю, вызов wakeup() из другого потока - лучший обходной путь.
Обновлено: Начиная с Kafka 2.0.0, все потребительские вызовы могут принимать тайм-аут. Это позволяет восстановить контроль в случае падения брокеров.