В моей фабрике контейнеров есть SeekToCurrentErrorHandler, который использует DeadLetterPublishingRecoverer для публикации в DLT определенных исключений типа «NotRetryableException» и продолжает искать одно и то же смещение для других типов исключений бесконечное количество раз. При такой настройке после определенного количества полезных нагрузок, которые приводят к исключениям без повторной попытки, карта, в которой хранится контекст повторной попытки — MapRetryContextCache (spring-retry), переполняется, вызывая исключение RetryCacheCapacityExceededException. На первый взгляд, контексты повторных попыток сообщений, которые должны обрабатываться восстановителем DLT, не удаляются из MapRetryContextCache. Либо так, либо моя конфигурация неверна.
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),-1);
eh.addNotRetryableException(SomeNonRetryableException.class);
eh.setCommitRecovered(true);
ConcurrentKafkaListenerContainerFactory<String, String> factory
= getContainerFactory();
factory.setErrorHandler(eh);
factory.setRetryTemplate(retryTemplate);
factory.setStatefulRetry(true);





Чтобы очистить кеш, необходимо выполнить восстановление в шаблоне повтора, а не в обработчике ошибок.
@SpringBootApplication
public class So56846940Application {
public static void main(String[] args) {
SpringApplication.run(So56846940Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicDLT() {
return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
DeadLetterPublishingRecoverer recoverer) {
factory.setRetryTemplate(new RetryTemplate());
factory.setStatefulRetry(true);
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
}
@KafkaListener(id = "so56846940", topics = "so56846940")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException();
}
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@Bean
public SeekToCurrentErrorHandler eh() {
return new SeekToCurrentErrorHandler(4);
}
}
Обработчик ошибок должен повторить как минимум столько же попыток, сколько и шаблон повтора, чтобы количество попыток было исчерпано и мы очистили кеш.
Вы также должны настроить RetryTemplate с теми же исключениями, которые нельзя повторять, что и обработчик ошибок.
Уточним в справочнике.
Спасибо за быстрый ответ и отличный фреймворк
Я обновил свой ответ; вы должны выполнить публикацию DLT в восстановителе шаблона повторной попытки, чтобы кеш был очищен.