Как отловить ошибку десериализации в Kafka-Spring?

Я запускаю приложение, использующее сообщения kafka.

Я следил за Spring-документы об обработке ошибок десериализации, чтобы поймать исключение десериализации. Я пробовал метод failedDeserializationFunction.

Это мой класс потребительской конфигурации

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        
        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

Это поставщик BiFunction

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

Когда я отправляю только одно поврежденное сообщение по теме, я получаю эту ошибку (в цикле):

org.apache.kafka.common.errors.SerializationException: ошибка десериализации ключа/значения

Я так понял, что ErrorHandlingDeserializer2 должен делегировать тип NTCBadMessageBody и продолжать потребление. Я также видел (в режиме отладки), что он никогда не попадал в конструктор класса NTCBadMessageBody.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
6
0
22 642
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

ErrorHandlingДесериализатор

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

Итак, в соответствии с вашим кодом, который вы используете record-level MessageListener, просто добавьте ErrorHandler к Container

Обработка исключений

If your error handler implements this interface you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following; note however, these are simplistic implementations and you would probably want more checking in the error handler.

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}

Или вы можете сделать собственную реализацию, как в этом примере.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}

Я пытаюсь ... но, похоже, во втором примере есть проблема, потому что у меня есть The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties

Pietro Fragnito 02.05.2019 09:32

Вношу эту поправку factory.setErrorHandler(new ErrorHandler() отлично работает! Большое спасибо!

Pietro Fragnito 02.05.2019 09:54

Есть ли способ получить доступ к информации о разделе (на самом деле TopicPartition) в пользовательской реализации выше для любого заданного исключения? Мы хотим перехватывать исключения и регистрировать их в базе данных, а затем увеличивать смещение в разделе. Но, анализируя сообщение об исключении, как указано выше, мы можем сделать это только для SerializationException.

Marinos An 18.01.2022 16:00

В приведенном выше ответе могут возникнуть проблемы, если имя раздела имеет символ, например «-». Итак, я изменил ту же логику с регулярным выражением.

    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.SerializationException;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class KafkaErrHandler implements ErrorHandler {
    
        /**
         * Method prevents serialization error freeze
         * 
         * @param e
         * @param consumer
         */
        private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
            String p = ".*partition (.*) at offset ([0-9]*).*";
            Pattern r = Pattern.compile(p);
    
            Matcher m = r.matcher(e.getMessage());
    
            if (m.find()) {
                int idx = m.group(1).lastIndexOf("-");
                String topics = m.group(1).substring(0, idx);
                int partition = Integer.parseInt(m.group(1).substring(idx));
                int offset = Integer.parseInt(m.group(2));
    
                TopicPartition topicPartition = new TopicPartition(topics, partition);
    
                consumer.seek(topicPartition, (offset + 1));
    
                log.info("Skipped message with offset {} from partition {}", offset, partition);
            }
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
        }
    
        @Override
        public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {
            log.error("Error in process with Exception {} and the records are {}", e, records);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
    
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
        }
    
    } 

наконец, используйте обработчик ошибок в config.

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(macdStatusConsumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setErrorHandler(new KafkaErrHandler());

    return factory;
}

Однако анализ строки ошибки для получения раздела, темы и смещения не рекомендуется. Если у кого-то есть лучшее решение, пожалуйста, напишите здесь.

Ваше решение лучше утвержденного. Я изменил int partition = Integer.parseInt(m.group(1).substring(idx)); на int partition = Integer.parseInt(m.group(1).substring(idx+1));, чтобы избежать отрицательного числа

Guillaume 26.07.2021 15:39

Используйте ErrorHandlingDeserializer.

When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer configured with the proper delegates. Alternatively, you can use consumer configuration properties which are used by the ErrorHandlingDeserializer to instantiate the delegates. The property names are ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS; the property value can be a class or class name

package com.mypackage.app.config;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.mypacakage.app.model.kafka.message.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import lombok.extern.slf4j.Slf4j;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String servers;

    @Value("${listener.group-id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(((exception, data) -> {
            /*
             * here you can do you custom handling, I am just logging it same as default
             * Error handler does If you just want to log. you need not configure the error
             * handler here. The default handler does it for you. Generally, you will
             * persist the failed records to DB for tracking the failed records.
             */
            log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));

        return factory;

    }

    @Bean
    public ConsumerFactory<String, KafkaEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
                "com.mypackage.app.model.kafka.message.KafkaEvent");
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    private RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        /*
         * here retry policy is used to set the number of attempts to retry and what
         * exceptions you wanted to try and what you don't want to retry.
         */
        retryTemplate.setRetryPolicy(retryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

        // the boolean value in the map determines whether exception should be retried
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);
        exceptionMap.put(ListenerExecutionFailedException.class, true);

        return new SimpleRetryPolicy(3, exceptionMap, true);
    }
}

Другие вопросы по теме