Обработчик ошибок Spring Kafka, как избежать бесконечного цикла

Я надеюсь, что кто-то может дать мне подсказку, что я делаю неправильно здесь. Я написал собственный обработчик ошибок для пакетного прослушивателя, который должен искать за полученными записями, отправлять их в dlq. Я сильно устал, но не могу заставить его работать. Моя текущая реализация будет зависать в бесконечном цикле, получая записи снова и снова. Вот код обработчика ошибок:

@Service("consumerAwareListenerErrorHandlerImpl")
public class ConsumerAwareListenerErrorHandlerImpl implements ConsumerAwareListenerErrorHandler {


    private final Executor executor;

    private final KafkaListenerEndpointRegistry registry;

    private final TaskScheduler scheduler;


    @Autowired
    public ConsumerAwareListenerErrorHandlerImpl(KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {
        this.scheduler = scheduler;
        this.executor = new SimpleAsyncTaskExecutor();
        this.registry = registry;
    }


    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {

        MessageHeaders headers = message.getHeaders();
        List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
        List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
        List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
        Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);


        Map<TopicPartition, Long> offsetsToReset = new HashMap<>();

        for (int i = 0; i < topics.size(); i++) {
            int index = i;
            offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                    (k, v) -> (v == null) ? offsets.get(index) : Math.max(v, offsets.get(index)));
        }
        offsetsToReset.forEach((k, v) -> consumer.seek(k, v));

        if (!(exception.getCause() instanceof DeserializationException)) {
            //pauseAndRestartContainer();
        }

        acknowledgment.acknowledge();
        consumer.commitSync();

        return null;
    }
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
1 271
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы должны стремиться к смещению + 1, чтобы пройти «прошлое». Поиск offset приведет к повторному воспроизведению.

Спасибо за подсказку! У меня также есть проблема, что ошибка hanlde не вызывается, когда возникает исключение десериализации. Конечно, я могу использовать ErrorHandlingDeserializer и вручную фильтровать ошибочные записи, но это мне не помогает. Я пытаюсь: Исключение сериализации: log; искать прошлое; продолжать идти; Другое исключение: пауза; перезапуск по расписанию; попробуйте еще раз У вас есть идеи, как решить эту проблему для пакетного прослушивателя?

Gerrit 23.04.2019 16:49

Мы добавили ErrorHandlingDeserializer2 (...2 лучше, чем первый, потому что он безопасен для типов) именно по этой причине. Проблема в том, что исключения десериализации возникают до того, как Spring получит данные, поэтому мы больше ничего не можем сделать.

Gary Russell 23.04.2019 16:58

Но я все еще должен фильтровать их в контейнере, как кажется?!

Gerrit 23.04.2019 17:11

Да, при использовании пакетного прослушивателя; однако теперь это более удобно, потому что значение является типобезопасным (либо null, либо фиктивный десериализованный объект, если вы предоставляете failedDeserializationFunction). При использовании прослушивателя на основе записей неудачная запись отправляется непосредственно в обработчик ошибок контейнера.

Gary Russell 23.04.2019 18:18

Еще один вопрос, Гэри Рассел: если я использую ErrorHandlingDeserializer2, отправитель должен использовать тот же класс для отправки json моему прослушивателю контейнера. В противном случае я получаю «org.springframework.messaging.converter.MessageConversionEx‌​ception: не удалось разрешить имя класса». Если я не использую ErrorHandlingDeserializer2, отправитель может отправить любой json, если он совместим с целевым d, чтобы он был десериализован без каких-либо исключений. У вас есть подсказка для меня, как решить эту проблему?

Gerrit 24.04.2019 09:54

Хорошо, я узнал, что могу сделать это, используя; props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, ложь);

Gerrit 24.04.2019 14:39

Другие вопросы по теме