Я запускаю приложение, использующее сообщения kafka.
Я следил за Spring-документы об обработке ошибок десериализации, чтобы поймать исключение десериализации. Я пробовал метод failedDeserializationFunction.
Это мой класс потребительской конфигурации
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
/* Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
return consumerProps;
}
@Bean
public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(NTCMessageBody.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Это поставщик BiFunction
public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {
@Override
public NTCMessageBody apply(byte[] t, Headers u) {
return new NTCBadMessageBody(t);
}
}
public class NTCBadMessageBody extends NTCMessageBody{
private final byte[] failedDecode;
public NTCBadMessageBody(byte[] failedDecode) {
this.failedDecode = failedDecode;
}
public byte[] getFailedDecode() {
return this.failedDecode;
}
}
Когда я отправляю только одно поврежденное сообщение по теме, я получаю эту ошибку (в цикле):
org.apache.kafka.common.errors.SerializationException: ошибка десериализации ключа/значения
Я так понял, что ErrorHandlingDeserializer2 должен делегировать тип NTCBadMessageBody и продолжать потребление. Я также видел (в режиме отладки), что он никогда не попадал в конструктор класса NTCBadMessageBody.




When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.
Итак, в соответствии с вашим кодом, который вы используете record-level MessageListener, просто добавьте ErrorHandler к Container
If your error handler implements this interface you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following; note however, these are simplistic implementations and you would probably want more checking in the error handler.
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
Или вы можете сделать собственную реализацию, как в этом примере.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
});
return factory;
}
Вношу эту поправку factory.setErrorHandler(new ErrorHandler() отлично работает! Большое спасибо!
Есть ли способ получить доступ к информации о разделе (на самом деле TopicPartition) в пользовательской реализации выше для любого заданного исключения? Мы хотим перехватывать исключения и регистрировать их в базе данных, а затем увеличивать смещение в разделе. Но, анализируя сообщение об исключении, как указано выше, мы можем сделать это только для SerializationException.
В приведенном выше ответе могут возникнуть проблемы, если имя раздела имеет символ, например «-». Итак, я изменил ту же логику с регулярным выражением.
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KafkaErrHandler implements ErrorHandler {
/**
* Method prevents serialization error freeze
*
* @param e
* @param consumer
*/
private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
String p = ".*partition (.*) at offset ([0-9]*).*";
Pattern r = Pattern.compile(p);
Matcher m = r.matcher(e.getMessage());
if (m.find()) {
int idx = m.group(1).lastIndexOf("-");
String topics = m.group(1).substring(0, idx);
int partition = Integer.parseInt(m.group(1).substring(idx));
int offset = Integer.parseInt(m.group(2));
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, (offset + 1));
log.info("Skipped message with offset {} from partition {}", offset, partition);
}
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
log.error("Error in process with Exception {} and the record is {}", e, record);
if (e instanceof SerializationException)
seekSerializeException(e, consumer);
}
@Override
public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.error("Error in process with Exception {} and the records are {}", e, records);
if (e instanceof SerializationException)
seekSerializeException(e, consumer);
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> record) {
log.error("Error in process with Exception {} and the record is {}", e, record);
}
}
наконец, используйте обработчик ошибок в config.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(macdStatusConsumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(new KafkaErrHandler());
return factory;
}
Однако анализ строки ошибки для получения раздела, темы и смещения не рекомендуется. Если у кого-то есть лучшее решение, пожалуйста, напишите здесь.
Ваше решение лучше утвержденного. Я изменил int partition = Integer.parseInt(m.group(1).substring(idx)); на int partition = Integer.parseInt(m.group(1).substring(idx+1));, чтобы избежать отрицательного числа
Используйте ErrorHandlingDeserializer.
When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.
You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer configured with the proper delegates. Alternatively, you can use consumer configuration properties which are used by the ErrorHandlingDeserializer to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS; the property value can be a class or class name
package com.mypackage.app.config;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.mypacakage.app.model.kafka.message.KafkaEvent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import lombok.extern.slf4j.Slf4j;
@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${listener.group-id}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setErrorHandler(((exception, data) -> {
/*
* here you can do you custom handling, I am just logging it same as default
* Error handler does If you just want to log. you need not configure the error
* handler here. The default handler does it for you. Generally, you will
* persist the failed records to DB for tracking the failed records.
*/
log.error("Error in process with Exception {} and the record is {}", exception, data);
}));
return factory;
}
@Bean
public ConsumerFactory<String, KafkaEvent> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
"com.mypackage.app.model.kafka.message.KafkaEvent");
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");
return new DefaultKafkaConsumerFactory<>(config);
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
/*
* here retry policy is used to set the number of attempts to retry and what
* exceptions you wanted to try and what you don't want to retry.
*/
retryTemplate.setRetryPolicy(retryPolicy());
return retryTemplate;
}
private SimpleRetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
// the boolean value in the map determines whether exception should be retried
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, true);
exceptionMap.put(ListenerExecutionFailedException.class, true);
return new SimpleRetryPolicy(3, exceptionMap, true);
}
}
Я пытаюсь ... но, похоже, во втором примере есть проблема, потому что у меня есть
The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties