Spring Cloud Stream Kafka Binder - повторные попытки не работают при использовании DLQ в пакетном режиме

Я использую Spring Cloud версии 2023.0.1 (весенний облачный поток версии 4.1.1) и написал простой потребитель Kafka в пакетном режиме для моделирования сценария ошибки.

    @Bean
    Consumer<Message<List<String>>> consumer1() {
        return message -> {
            final List<String> payload = message.getPayload();
            final MessageHeaders messageHeaders = message.getHeaders();
            payload.forEach(System.out::println);
            payload.forEach(p -> {
                if (p.startsWith("a")) {
                    throw new RuntimeException("Intentional Exception");
                }
            });
            System.out.println(messageHeaders);
            System.out.println("Done");
        };
    }

Мой файл application.yml выглядит так

spring:
  cloud:
    function:
      definition: consumer1;
    stream:
      bindings:
        consumer1-in-0:
          destination: topic1
          group: consumer1-in-0-v0.1
          consumer:
            batch-mode: true
            use-native-decoding: true
            max-attempts: 3
      kafka:
        binder:
          brokers:
            - localhost:9092
        default:
          consumer:
            configuration:
              max.poll.records: 1000
              max.partition.fetch.bytes: 31457280
              fetch.max.wait.ms: 200
        bindings:
          consumer1-in-0:
            consumer:
              enableDlq: true
              dlqName: dlq-topic
              dlqProducerProperties:
                configuration:
                  value.serializer: org.apache.kafka.common.serialization.StringSerializer
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

Я также указал ListenerContainerWithDlqAndRetryCustomizer для настройки повторных попыток.

    @Bean
    ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                                  String group,
                                  @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                                  @Nullable BackOff backOff) {

                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                return false;
            }

        };
    }

Проблема При возникновении ошибки пакет сообщений направляется прямо в DLQ. И повторные попытки не предпринимаются.

Однако проблема в том, что могут возникнуть временные ошибки, из-за которых пакет не удалось обработать, и я хочу, чтобы пакет был повторен несколько раз, прежде чем отправить его в DLQ. Но я не могу заставить его работать.

Что я делаю не так?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
0
0
76
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

На случай, если кто-нибудь наткнется на это в будущем, я понял, в чем дело. Мне пришлось удалить enableDlq, dlqName и dlqProducerProperties из файла application.yml.

Тогда это сработало.

В Java-коде я также удалил ListenerContainerWithDlqAndRetryCustomizer и просто использовал ListenerContainerCustomizer. Код выглядел примерно так:

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
        return (container, dest, group) -> container.setCommonErrorHandler(errorHandler);
    }

    @Bean
    public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4));
    }

    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate,
                                                   KafkaOperations<?, ?> bytesTemplate,
                                                   KafkaOperations<?, ?> longTemplate) {
        Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
        templates.put(String.class, stringTemplate);
        templates.put(byte[].class, bytesTemplate);
        templates.put(Long.class, longTemplate);
        return new DeadLetterPublishingRecoverer(templates);
    }

    @Bean
    public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }

    @Bean
    public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) {
        return new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }

Исходный вопрос выявил ошибку в подшивке, которая, как я полагаю, вызвала проблему, которую вы видели раньше. Подробности смотрите в этом выпуске: https://github.com/spring-cloud/spring-cloud-stream/issues/2951.

Если вы попробуете версию подшивки 4.1.2-SNAPSHOT, приведенный выше код у меня работает:

@SpringBootApplication
public class So78485425Application {

    public static void main(String[] args) {
        SpringApplication.run(So78485425Application.class, args);
    }

    @Bean
    Consumer<Message<List<String>>> consumer1() {
        return message -> {
            final List<String> payload = message.getPayload();
            final MessageHeaders messageHeaders = message.getHeaders();
            payload.forEach(System.out::println);
            payload.forEach(p -> {
                if (p.startsWith("a")) {
                    throw new RuntimeException("Intentional Exception");
                }
            });
            System.out.println(messageHeaders);
            System.out.println("Done");
        };
    }

    @Bean
    public KafkaTemplate<String, String> stringStringKafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf, Map.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
    }

    @Bean
    ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> stringStringKafkaTemplate) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                                  String group,
                                  BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                                  BackOff backOff) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(stringStringKafkaTemplate,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                return false;
            }

        };
    }

}

и соответствующая конфигурация:

spring:
  cloud:
    function:
      definition: consumer1
    stream:
      bindings:
        consumer1-in-0:
          destination: topic1
          group: consumer1-in-0-v0.1
          consumer:
            batch-mode: true
            use-native-decoding: true
            max-attempts: 3
      kafka:
        binder:
          brokers:
            - localhost:9092
        default:
          consumer:
            configuration:
              max.poll.records: 1000
              max.partition.fetch.bytes: 31457280
              fetch.max.wait.ms: 200
        bindings:
          consumer1-in-0:
            consumer:
              enableDlq: true
              dlqName: dlq-topic
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

Другие вопросы по теме

Похожие вопросы