Я работаю с приложением Kafka Streams, где мы используем динамическое определение тем на основе заголовков сообщений. В нашей настройке удаление тем во время работы приложения является нормальным явлением. Сообщения по удаленной теме иногда могут приходить, но я хочу их просто игнорировать. Однако даже получив всего одно сообщение по несуществующей теме, я сталкиваюсь с бесконечным циклом ошибок:
[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 74 : {test1=UNKNOWN_TOPIC_OR_PARTITION}
org.apache.kafka.common.errors.TimeoutException: Topic test1 not present in metadata after 60000 ms.
[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 79 : {test1=UNKNOWN_TOPIC_OR_PARTITION}
Этот бесконечный цикл ошибок по сути приводит к тому, что приложение перестает работать. Как я могу настроить приложение Kafka Streams так, чтобы оно игнорировало сообщения об удаленных темах, не входя в бесконечный цикл ошибок? Есть ли способ справиться с этой ситуацией? Вот упрощенный пример кода моего приложения:
StreamsBuilder builder = new StreamsBuilder();
List<String> dynamicTopics = List.of("good_topic", "deleted_topic");
builder.stream("source_topic").to((k, v, c) -> dynamicTopics.get(new Random().nextInt(dynamicTopics.size()))); //in real application from header
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Automatic topic creation is disabled.
Я попробовал следующее, чтобы обработать и проигнорировать ошибку:
Используйте KafkaAdmin. Однако между проверками существующих тем тема может быть удалена, что не решает проблему.
Установите UncaughtExceptionHandler:
streams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
@Override
public StreamThreadExceptionResponse handle(Throwable throwable) {
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
});
Но код даже не доходит до этого обработчика.
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
Опять же, код не достигает этого обработчика.
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ErrorInterceptor.class.getName());
Код достигает этого перехватчика, но я не могу решить проблему отсюда.
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "5000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), "8000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "0");
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 0);
Я попытался настроить эти свойства производителя, но Kafka Streams все еще пытается обрабатывать ошибку в течение неопределенного времени.




В настоящее время нет возможности сделать то, что вы хотите. Я покопался в коде с коллегой (спасибо, Эндрю!), и производитель возвращает TimeoutException для этого случая, что такое RetriableException, и таким образом KafkaStreams не вызывает обработчик исключений производства (единственное место, где вы можете проглотить ошибку), но , ну, повторит попытку. В общем случае такое поведение имеет смысл (KafkaStreams пытается обрабатывать как можно больше ошибок внутри), но для вашего сценария у вас есть несколько «странный» угловой случай, и шаблон нарушается.
Это несколько странный случай, когда производитель возвращает здесь повторяемое исключение; отсутствующие метаданные в большинстве случаев подлежат повторному использованию, так что это не совсем неправильно, но с несуществующей темой это не всегда правильно (проблема в том, что производитель не может различить оба случая...)
Записано Issues.apache.org/jira/browse/KAFKA-16508