Обработка ошибки синтаксического анализа json без сбоя приложения потокового процессора Kafka

У меня есть потоковое приложение kafka, которое сопоставляет/преобразует сообщение json и передает вывод в тему.

KStream<String, String> logMessageStream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
logMessageStream.map((k, v) -> { //Map record 
                try { // Map record to (requestId, message)
                    // readValue throws IOException, JsonParseException, JsonMappingException
                    LogMessage logMessage = objectMapper.readValue(v, LogMessage.class);
                    return new KeyValue<>(logMessage.requestId(), logMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null; // <== RETURNS null due to caught exception
}).toStream().to(outoutTopic)

теперь я получу ошибку синтаксического анализа, если входная запись json содержит недопустимый синтаксис, потоковое приложение вылетает с ошибкой:

java.lang.NullPointerException
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    ....

Я хочу использовать эту ошибку при отображении и продолжить обработку другого сообщения. Есть ли какой-либо обработчик, который я могу установить для потребителя исключения. Ищу предложения.

Спасибо..

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
1 298
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Просто взгляните на метод setUncaughtExceptionHandler:

KafkaStreams streams = new KafkaStreams(topology, props);    
streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
        // your logic here
    });

Пробовал это. Обработчик вызывается при возникновении исключения. Но это не мешает приложению быть убитым.

Pavan 21.05.2019 11:51
Ответ принят как подходящий

Вы также можете воспользоваться свойством StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, как описано в https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records.

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  LogAndContinueExceptionHandler.class.getName()
);

Спасибо .. Это дало некоторый намек на решение моей проблемы. Я создал JsonSerde и использовал его при потреблении потока. Поэтому всякий раз, когда возникает исключение разбора JSON, DESERIALIZATION_EXCEPTION_HANDLER_CLASS будет потреблять исключение и продолжать. Принимая это как ответ..

Pavan 21.05.2019 11:55

Вместо использования map() вы можете использовать flatMap(), что позволяет вам возвращать нулевые элементы. Возврат null из map() не допускается, как указано в JavaDocs:

The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.

Обратите внимание, что flatMap() также не позволяет вернуть null. Но он принимает все, что вы можете повторить (например, Iterable). Например, вы можете вернуть Collections.singleton() в случае успеха и Collection.emptySet() в случае неудачи.

Другие вопросы по теме