Потребитель Spring Cloud Stream Kafka в пакетном режиме не повторяет попытку

Я использую Spring Cloud Stream (версия 4.0.3) и Kafka Binder в приложении Spring Boot для пакетного потребления сообщений из темы Kafka. При возникновении исключения весь пакет отправляется в тему DLQ без повторной попытки. Пожалуйста, помогите мне найти проблему.

Ниже приведена моя повторная попытка и конфигурация dlq.

@Configuration
public class KafkaRetryConfig {

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(KafkaOperations<Object, Object> bytesTemplate) {
        return (container, destinationName, group) -> {
            container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(bytesTemplate), new FixedBackOff(5000L, 5L)));
        };
    }
}

Ниже приведен потребительский код Kafka.

@Bean
    public Consumer<Message<List<records>>> recordsConsumer() {
        return message -> {
            List<records> records= message.getPayload();
            int index = IntStream.range(0, records.size())
                    .filter(streamIndex -> records.get(streamIndex).getId().equals("abc123"))
                    .findFirst()
                    .orElse(-1);
            Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            assert acknowledgment != null;
            try {
                if (index > -1) {
                    throw new RuntimeException("runtime exception");
                }
                //message processing logic
                acknowledgment.acknowledge();
            } catch (Exception e) {
                throw new BatchListenerFailedException(records.get(index).toString(),index);
            }
        };
    }

Ниже приведены свойства моего приложения

spring:
  cloud:
    stream:
      default-binder: kafka
      default:
        contentType: application/*+avro
        consumer:
          useNativeDecoding: true
          autoStartup: false
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          autoCreateTopics: false
          brokers: broker
          configuration:
            enable:
              auto.commit: false
              idempotence: true
            max.in.flight.requests.per.connection: 1
            request.timeout.ms: 5000
            security.protocol: SASL_SSL
            sasl:
              kerberos:
                service:
                  name: service-name
              jaas:
                config: com.sun.security.auth.module.Krb5LoginModule required
                  doNotPrompt=true
                  useKeyTab=true
                  useTicketCache=false
                  storeKey=true
                  keyTab = "xyz.keytab"
                  principal = "[email protected]";
            ssl:
              endpoint.identification.algorithm:
              truststore:
                type: JKS
                location: /config/global/payx-cacerts/cacerts
                password: changeit
          consumer-properties:
            client.id: hrs-productsubscription-consumer-test-9
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: schema-registry-url
            #fetch.max.wait.ms: 60000
            max.poll.records: 200
          requiredAcks: -1
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        bindings:
          recordsConsumer-in-0:
            consumer:
              #ackMode: MANUAL
              startOffset: earliest
              resetOffsets: false
              autoCommitOffset: false
              enableDlq: true
              dlqName: dlq-topic-name
              dlqPartitions: 1
              dlqProducerProperties:
                configuration:
                  key.serializer: org.apache.kafka.common.serialization.StringSerializer
                  value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                  schema.registry.url: schema-registry-url
              configuration:
                group.id: group-id
                schema.registry.url: schema-registry-url
                autoStartup: true
                key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring:
                  deserializer:
                    key:
                      delegate.class: org.apache.kafka.common.serialization.StringDeserializer
                    value:
                      delegate.class: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
      bindings:
        recordsConsumer-in-0:
          consumer:
            batch-mode: true
            max-attempts: 2
          destination: topic-name
          group: group-name
          partitioned: true
          concurrency: 8

Конфигурация повтора в моем приложении, похоже, не работает.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
57
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Пожалуйста, просмотрите ответ в этой теме SO и посмотрите, связано ли это. Spring Cloud Stream Kafka Binder — повторные попытки не работают при использовании DLQ в пакетном режиме

Похоже, это связано с похожими проблемами. Пожалуйста, обновите подшивку до последней версии (4.1.2), так как в этой области в строке 4.1.x были внесены некоторые исправления.

Другие вопросы по теме