Я использую Spring WebFlux, версия загрузки Spring — 3.2.0. Я использовал родной Reactor Kafka.
Пружинный ботинок Sleuth перенесен в Отслеживание микрометра
Вы можете прочитать это здесь:
Последняя дополнительная версия Spring Cloud Sleuth — 3.1. Вы можете проверить ветку 3.1.x на наличие последних коммитов. Ядро этого проекта было перенесено в проект Micrometer Tracing, а инструменты будут перенесены в Micrometer и все соответствующие проекты (больше не все инструменты будут храниться в одном репозитории).
Я хочу реализовать распределенную трассировку. Мне нужны TraceId и SpaId.
Для трассировки я использовал следующие зависимости:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Конфигурация Кафки
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
private final KafkaTopicProperties topicProperties;
@Bean
public <K, V> ReactiveKafkaProducerTemplate<K, V> reactiveKafkaProducerTemplate(ObservationRegistry registry, PropagatingSenderTracingObservationHandler<?> handler) { // Generic Producer, If needed , create custom producer for specific object.
registry.observationConfig().observationHandler(handler);
var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));
var senderOptions = SenderOptions.<K, V>create(properties)
.withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
return new ReactiveKafkaProducerTemplate<>(senderOptions);
}
@Bean(name = "otpSendReactiveKafkaConsumer")
public ReactiveKafkaConsumerTemplate<String, OtpSendRequestTransferObject> otpSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
registry.observationConfig()
.observationHandler(handler);
return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getOtp().getTopic()), registry));
}
@Bean(name = "smsSendReactiveKafkaConsumer")
public ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
registry.observationConfig()
.observationHandler(handler);
return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getSms().getTopic()), registry));
}
private <K, V> ReceiverOptions<K, V> createReceiverOptions(List<String> topics, ObservationRegistry registry) {
var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));
return ReceiverOptions.<K, V>create(properties)
.withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
.subscription(topics);
}
}
Кафка Продюсер
@Service
@Slf4j
@RequiredArgsConstructor
public class SendSMSReactiveKafkaProducer {
private final KafkaTopicProperties kafkaTopicProperties;
private final ReactiveKafkaProducerTemplate<String, SmsSendRequestTransferObject> kafkaProducerTemplate;
public Mono<CommonResult> produce(SmsSendRequest request) {
var topic = kafkaTopicProperties.getSms().getTopic();
var kafkaHeaders = new RecordHeaders();
var message = SmsSendRequestTransferObject.builder()
.content(request.getContent())
.receiver(request.getReceiver())
.build();
var record = new ProducerRecord<>(topic, null, UUID.randomUUID().toString(), message, kafkaHeaders);
return kafkaProducerTemplate.send(record)
.doOnError(ex -> log.error("Failed to send record = {} to topic = {} failed.", record, topic, ex))
.doOnSuccess(result -> log.debug("Record is successfully sent to topic = {}, metadata = {}", topic, result.recordMetadata()))
.map(result -> {
var hasResult = HasResult.SUCCESS;
if (result.exception() != null) {
hasResult = HasResult.UNKNOWN_ERROR;
}
return new CommonResult(hasResult);
});
}
}
Кафка Потребитель
@Service
@Slf4j
public class SendSMSReactiveKafkaConsumer extends AbstractBaseReactiveKafkaConsumer<String, SmsSendRequestTransferObject> {
private final RetryBackoffSpec retryBackoffSpec;
private final SmsMessageSender smsMessageSender;
private final ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer;
public SendSMSReactiveKafkaConsumer(ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer,
ReactiveKafkaProducerTemplate<String, ErrorRecord<SmsSendRequestTransferObject>> dlqProducerTemplate,
SmsMessageSender smsMessageSender, KafkaTopicProperties topicProperties,
SmsSettingProperties smsSettingProperties) {
super(topicProperties.getSms().getDlq(), dlqProducerTemplate);
var params = smsSettingProperties.getRetryBackOffSpec().getKafka();
this.smsMessageSender = smsMessageSender;
this.smsSendReactiveKafkaConsumer = smsSendReactiveKafkaConsumer;
this.retryBackoffSpec = RetryBackoffSpecUtil.createRetryBackoffSpec(params.getMaxAttempts(), params.getBackOffPeriod());
}
@PostConstruct
public void init() {
consume()
.onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
.subscribe();
}
@Override
public Flux<Void> consume() {
return smsSendReactiveKafkaConsumer.receiveAtMostOnce() // Be careful to use receive types. receiveAtMostOnce - receives message and commits immediately , if failure occurs message will not be redelivered, but it guarantees message commit.
.onErrorResume(Throwable.class, ex -> {
log.error("Exception on receiving message from Kafka.", ex);
return Mono.empty(); // Continue processing the next message
})
.flatMap(message -> {
return Mono.just(message)
.doOnNext(this::log)
.flatMap(record -> smsMessageSender.send(record.value(), retryBackoffSpec))
.then()
.onErrorResume(Throwable.class, ex -> {
log.error("Error during message processing.", ex);
return sendToDLQ(message, ex); // Send message to DLQ topic
});
}
)
.doOnTerminate(() -> log.error("The subscription was terminated. Either it was cancelled or completed successfully."))
.subscribeOn(Schedulers.boundedElastic());
}
}
Настроил файл application.yaml следующим образом:
logging:
pattern:
level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"
Он работает хорошо, но когда я публикую запрос в теме Kafka, я теряю текущие TraceId и SpaId.
Я также настроил WebFlux, например Hooks.enableAutomaticContextPropagation();
При получении запроса от конечной точки трассировка работает и генерируются TraceId и SpaId. В разных контекстах он также сохраняет текущий след.
Проблема в том, что я потерял свои TraceId и SpaId после публикации сообщения в Kafka. Как настроить реактивное распространение контекста для Kafka.
Вам необходимо явно настроить наблюдение для SenderOptions
. Spring Boot не настраивается автоматически, поскольку для Reactor Kafka просто нет автоматической настройки.
Дополнительную информацию смотрите в документации: https://projectreactor.io/docs/kafka/release/reference/
И найдите 5.5. Micrometer Observation
.
И вы также должны использовать зависимость io.micrometer:micrometer-observation
.
Вы, вероятно, говорите об отслеживании на стороне потребителя, поэтому вам необходимо следовать рекомендациям этого документа: KafkaReceiverObservation.RECEIVER_OBSERVATION API must be used manually in the record processing operator:
. Это .withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
очень ограничено только для потребительских внутренних устройств. Пользовательский код не участвует в этом наблюдении из-за характера распространения контекста Reactor.
Дополнительную информацию смотрите в моем примере: github.com/artembilan/sandbox/tree/master/…
Я отредактировал тело вопроса. Я уже настроил SenderOptions и ReceiverOptions. Где я делаю ошибку?