Stateful-Retry с DeadLetterPublishingRecoverer вызывает RetryCacheCapacityExceededException

В моей фабрике контейнеров есть SeekToCurrentErrorHandler, который использует DeadLetterPublishingRecoverer для публикации в DLT определенных исключений типа «NotRetryableException» и продолжает искать одно и то же смещение для других типов исключений бесконечное количество раз. При такой настройке после определенного количества полезных нагрузок, которые приводят к исключениям без повторной попытки, карта, в которой хранится контекст повторной попытки — MapRetryContextCache (spring-retry), переполняется, вызывая исключение RetryCacheCapacityExceededException. На первый взгляд, контексты повторных попыток сообщений, которые должны обрабатываться восстановителем DLT, не удаляются из MapRetryContextCache. Либо так, либо моя конфигурация неверна.

SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate),-1);
eh.addNotRetryableException(SomeNonRetryableException.class);
        eh.setCommitRecovered(true);
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = getContainerFactory();
        factory.setErrorHandler(eh);
        factory.setRetryTemplate(retryTemplate);
        factory.setStatefulRetry(true);
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
798
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Чтобы очистить кеш, необходимо выполнить восстановление в шаблоне повтора, а не в обработчике ошибок.

@SpringBootApplication
public class So56846940Application {

    public static void main(String[] args) {
        SpringApplication.run(So56846940Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicDLT() {
        return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            DeadLetterPublishingRecoverer recoverer) {

        factory.setRetryTemplate(new RetryTemplate());
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback(context -> {
            recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                    (Exception) context.getLastThrowable());
            return null;
        });

        return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
    }

    @KafkaListener(id = "so56846940", topics = "so56846940")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException();
    }

    @Bean
    public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
        return new DeadLetterPublishingRecoverer(template);
    }

    @Bean
    public SeekToCurrentErrorHandler eh() {
        return new SeekToCurrentErrorHandler(4);
    }

}

Обработчик ошибок должен повторить как минимум столько же попыток, сколько и шаблон повтора, чтобы количество попыток было исчерпано и мы очистили кеш.

Вы также должны настроить RetryTemplate с теми же исключениями, которые нельзя повторять, что и обработчик ошибок.

Уточним в справочнике.

Я обновил свой ответ; вы должны выполнить публикацию DLT в восстановителе шаблона повторной попытки, чтобы кеш был очищен.

Gary Russell 02.07.2019 15:49

Спасибо за быстрый ответ и отличный фреймворк

Pradeep Balasundaram 02.07.2019 17:51

Другие вопросы по теме