Я использую Spring Cloud версии 2023.0.1 (весенний облачный поток версии 4.1.1) и написал простой потребитель Kafka в пакетном режиме для моделирования сценария ошибки.
@Bean
Consumer<Message<List<String>>> consumer1() {
return message -> {
final List<String> payload = message.getPayload();
final MessageHeaders messageHeaders = message.getHeaders();
payload.forEach(System.out::println);
payload.forEach(p -> {
if (p.startsWith("a")) {
throw new RuntimeException("Intentional Exception");
}
});
System.out.println(messageHeaders);
System.out.println("Done");
};
}
Мой файл application.yml выглядит так
spring:
cloud:
function:
definition: consumer1;
stream:
bindings:
consumer1-in-0:
destination: topic1
group: consumer1-in-0-v0.1
consumer:
batch-mode: true
use-native-decoding: true
max-attempts: 3
kafka:
binder:
brokers:
- localhost:9092
default:
consumer:
configuration:
max.poll.records: 1000
max.partition.fetch.bytes: 31457280
fetch.max.wait.ms: 200
bindings:
consumer1-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Я также указал ListenerContainerWithDlqAndRetryCustomizer для настройки повторных попыток.
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return false;
}
};
}
Проблема При возникновении ошибки пакет сообщений направляется прямо в DLQ. И повторные попытки не предпринимаются.
Однако проблема в том, что могут возникнуть временные ошибки, из-за которых пакет не удалось обработать, и я хочу, чтобы пакет был повторен несколько раз, прежде чем отправить его в DLQ. Но я не могу заставить его работать.
Что я делаю не так?




На случай, если кто-нибудь наткнется на это в будущем, я понял, в чем дело.
Мне пришлось удалить enableDlq, dlqName и dlqProducerProperties из файла application.yml.
Тогда это сработало.
В Java-коде я также удалил ListenerContainerWithDlqAndRetryCustomizer и просто использовал ListenerContainerCustomizer.
Код выглядел примерно так:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> container.setCommonErrorHandler(errorHandler);
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4));
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate,
KafkaOperations<?, ?> bytesTemplate,
KafkaOperations<?, ?> longTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
templates.put(Long.class, longTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
@Bean
public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) {
return new KafkaTemplate<>(pf,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
Исходный вопрос выявил ошибку в подшивке, которая, как я полагаю, вызвала проблему, которую вы видели раньше. Подробности смотрите в этом выпуске: https://github.com/spring-cloud/spring-cloud-stream/issues/2951.
Если вы попробуете версию подшивки 4.1.2-SNAPSHOT, приведенный выше код у меня работает:
@SpringBootApplication
public class So78485425Application {
public static void main(String[] args) {
SpringApplication.run(So78485425Application.class, args);
}
@Bean
Consumer<Message<List<String>>> consumer1() {
return message -> {
final List<String> payload = message.getPayload();
final MessageHeaders messageHeaders = message.getHeaders();
payload.forEach(System.out::println);
payload.forEach(p -> {
if (p.startsWith("a")) {
throw new RuntimeException("Intentional Exception");
}
});
System.out.println(messageHeaders);
System.out.println("Done");
};
}
@Bean
public KafkaTemplate<String, String> stringStringKafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf, Map.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> stringStringKafkaTemplate) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(stringStringKafkaTemplate,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return false;
}
};
}
}
и соответствующая конфигурация:
spring:
cloud:
function:
definition: consumer1
stream:
bindings:
consumer1-in-0:
destination: topic1
group: consumer1-in-0-v0.1
consumer:
batch-mode: true
use-native-decoding: true
max-attempts: 3
kafka:
binder:
brokers:
- localhost:9092
default:
consumer:
configuration:
max.poll.records: 1000
max.partition.fetch.bytes: 31457280
fetch.max.wait.ms: 200
bindings:
consumer1-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer