У меня есть потоковое приложение kafka, которое сопоставляет/преобразует сообщение json и передает вывод в тему.
KStream<String, String> logMessageStream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
logMessageStream.map((k, v) -> { //Map record
try { // Map record to (requestId, message)
// readValue throws IOException, JsonParseException, JsonMappingException
LogMessage logMessage = objectMapper.readValue(v, LogMessage.class);
return new KeyValue<>(logMessage.requestId(), logMessage);
} catch (IOException e) {
e.printStackTrace();
}
return null; // <== RETURNS null due to caught exception
}).toStream().to(outoutTopic)
теперь я получу ошибку синтаксического анализа, если входная запись json содержит недопустимый синтаксис, потоковое приложение вылетает с ошибкой:
java.lang.NullPointerException
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
....
Я хочу использовать эту ошибку при отображении и продолжить обработку другого сообщения. Есть ли какой-либо обработчик, который я могу установить для потребителя исключения. Ищу предложения.
Спасибо..




Просто взгляните на метод setUncaughtExceptionHandler:
KafkaStreams streams = new KafkaStreams(topology, props);
streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
// your logic here
});
Вы также можете воспользоваться свойством StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, как описано в https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records.
Properties streamsSettings = new Properties();
streamsSettings.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName()
);
Спасибо .. Это дало некоторый намек на решение моей проблемы. Я создал JsonSerde и использовал его при потреблении потока. Поэтому всякий раз, когда возникает исключение разбора JSON, DESERIALIZATION_EXCEPTION_HANDLER_CLASS будет потреблять исключение и продолжать. Принимая это как ответ..
Вместо использования map() вы можете использовать flatMap(), что позволяет вам возвращать нулевые элементы. Возврат null из map() не допускается, как указано в JavaDocs:
The provided {@link KeyValueMapper} must return a {@link KeyValue} type and must not return {@code null}.
Обратите внимание, что flatMap() также не позволяет вернуть null. Но он принимает все, что вы можете повторить (например, Iterable). Например, вы можете вернуть Collections.singleton() в случае успеха и Collection.emptySet() в случае неудачи.
Пробовал это. Обработчик вызывается при возникновении исключения. Но это не мешает приложению быть убитым.