Обработка и игнорирование ошибки UNKNOWN_TOPIC_OR_PARTITION в Kafka Streams

Я работаю с приложением Kafka Streams, где мы используем динамическое определение тем на основе заголовков сообщений. В нашей настройке удаление тем во время работы приложения является нормальным явлением. Сообщения по удаленной теме иногда могут приходить, но я хочу их просто игнорировать. Однако даже получив всего одно сообщение по несуществующей теме, я сталкиваюсь с бесконечным циклом ошибок:

[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 74 : {test1=UNKNOWN_TOPIC_OR_PARTITION}

org.apache.kafka.common.errors.TimeoutException: Topic test1 not present in metadata after 60000 ms.

[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 79 : {test1=UNKNOWN_TOPIC_OR_PARTITION}

Этот бесконечный цикл ошибок по сути приводит к тому, что приложение перестает работать. Как я могу настроить приложение Kafka Streams так, чтобы оно игнорировало сообщения об удаленных темах, не входя в бесконечный цикл ошибок? Есть ли способ справиться с этой ситуацией? Вот упрощенный пример кода моего приложения:

StreamsBuilder builder = new StreamsBuilder();
List<String> dynamicTopics = List.of("good_topic", "deleted_topic");
builder.stream("source_topic").to((k, v, c) -> dynamicTopics.get(new Random().nextInt(dynamicTopics.size()))); //in real application from header
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Automatic topic creation is disabled.

Я попробовал следующее, чтобы обработать и проигнорировать ошибку:

  1. Используйте KafkaAdmin. Однако между проверками существующих тем тема может быть удалена, что не решает проблему.

  2. Установите UncaughtExceptionHandler:

streams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
    @Override
    public StreamThreadExceptionResponse handle(Throwable throwable) {
        return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
    }
});

Но код даже не доходит до этого обработчика.

  1. Установите ProductionExceptionHandler:
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          CustomProductionExceptionHandler.class.getName());

Опять же, код не достигает этого обработчика.

  1. Установить перехватчик продюсера:
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ErrorInterceptor.class.getName());

Код достигает этого перехватчика, но я не могу решить проблему отсюда.

  1. Настройте свойства производителя:
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "5000"); 
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), "8000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "0");
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 0);

Я попытался настроить эти свойства производителя, но Kafka Streams все еще пытается обрабатывать ошибку в течение неопределенного времени.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
213
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В настоящее время нет возможности сделать то, что вы хотите. Я покопался в коде с коллегой (спасибо, Эндрю!), и производитель возвращает TimeoutException для этого случая, что такое RetriableException, и таким образом KafkaStreams не вызывает обработчик исключений производства (единственное место, где вы можете проглотить ошибку), но , ну, повторит попытку. В общем случае такое поведение имеет смысл (KafkaStreams пытается обрабатывать как можно больше ошибок внутри), но для вашего сценария у вас есть несколько «странный» угловой случай, и шаблон нарушается.

Это несколько странный случай, когда производитель возвращает здесь повторяемое исключение; отсутствующие метаданные в большинстве случаев подлежат повторному использованию, так что это не совсем неправильно, но с несуществующей темой это не всегда правильно (проблема в том, что производитель не может различить оба случая...)

Записано Issues.apache.org/jira/browse/KAFKA-16508

Matthias J. Sax 10.04.2024 17:50

Другие вопросы по теме