Я использую Spring Cloud Stream (версия 4.0.3) и Kafka Binder в приложении Spring Boot для пакетного потребления сообщений из темы Kafka. При возникновении исключения весь пакет отправляется в тему DLQ без повторной попытки. Пожалуйста, помогите мне найти проблему.
Ниже приведена моя повторная попытка и конфигурация dlq.
@Configuration
public class KafkaRetryConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(KafkaOperations<Object, Object> bytesTemplate) {
return (container, destinationName, group) -> {
container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(bytesTemplate), new FixedBackOff(5000L, 5L)));
};
}
}
Ниже приведен потребительский код Kafka.
@Bean
public Consumer<Message<List<records>>> recordsConsumer() {
return message -> {
List<records> records= message.getPayload();
int index = IntStream.range(0, records.size())
.filter(streamIndex -> records.get(streamIndex).getId().equals("abc123"))
.findFirst()
.orElse(-1);
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
assert acknowledgment != null;
try {
if (index > -1) {
throw new RuntimeException("runtime exception");
}
//message processing logic
acknowledgment.acknowledge();
} catch (Exception e) {
throw new BatchListenerFailedException(records.get(index).toString(),index);
}
};
}
Ниже приведены свойства моего приложения
spring:
cloud:
stream:
default-binder: kafka
default:
contentType: application/*+avro
consumer:
useNativeDecoding: true
autoStartup: false
producer:
useNativeEncoding: true
kafka:
binder:
autoCreateTopics: false
brokers: broker
configuration:
enable:
auto.commit: false
idempotence: true
max.in.flight.requests.per.connection: 1
request.timeout.ms: 5000
security.protocol: SASL_SSL
sasl:
kerberos:
service:
name: service-name
jaas:
config: com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
storeKey=true
keyTab = "xyz.keytab"
principal = "[email protected]";
ssl:
endpoint.identification.algorithm:
truststore:
type: JKS
location: /config/global/payx-cacerts/cacerts
password: changeit
consumer-properties:
client.id: hrs-productsubscription-consumer-test-9
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: schema-registry-url
#fetch.max.wait.ms: 60000
max.poll.records: 200
requiredAcks: -1
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
recordsConsumer-in-0:
consumer:
#ackMode: MANUAL
startOffset: earliest
resetOffsets: false
autoCommitOffset: false
enableDlq: true
dlqName: dlq-topic-name
dlqPartitions: 1
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: schema-registry-url
configuration:
group.id: group-id
schema.registry.url: schema-registry-url
autoStartup: true
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring:
deserializer:
key:
delegate.class: org.apache.kafka.common.serialization.StringDeserializer
value:
delegate.class: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
bindings:
recordsConsumer-in-0:
consumer:
batch-mode: true
max-attempts: 2
destination: topic-name
group: group-name
partitioned: true
concurrency: 8
Конфигурация повтора в моем приложении, похоже, не работает.
Пожалуйста, просмотрите ответ в этой теме SO и посмотрите, связано ли это. Spring Cloud Stream Kafka Binder — повторные попытки не работают при использовании DLQ в пакетном режиме
Похоже, это связано с похожими проблемами. Пожалуйста, обновите подшивку до последней версии (4.1.2
), так как в этой области в строке 4.1.x
были внесены некоторые исправления.