Распределенная трассировка не работает с Spring WebFlux + Reactor Kafka

Я использую Spring WebFlux, версия загрузки Spring — 3.2.0. Я использовал родной Reactor Kafka.

Пружинный ботинок Sleuth перенесен в Отслеживание микрометра

Вы можете прочитать это здесь:

Последняя дополнительная версия Spring Cloud Sleuth — 3.1. Вы можете проверить ветку 3.1.x на наличие последних коммитов. Ядро этого проекта было перенесено в проект Micrometer Tracing, а инструменты будут перенесены в Micrometer и все соответствующие проекты (больше не все инструменты будут храниться в одном репозитории).

Я хочу реализовать распределенную трассировку. Мне нужны TraceId и SpaId.

Для трассировки я использовал следующие зависимости:

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core-micrometer</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

Конфигурация Кафки

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConfiguration {
    private final KafkaProperties kafkaProperties;
    private final KafkaTopicProperties topicProperties;

    @Bean
    public <K, V> ReactiveKafkaProducerTemplate<K, V> reactiveKafkaProducerTemplate(ObservationRegistry registry, PropagatingSenderTracingObservationHandler<?> handler) { // Generic Producer, If needed , create custom producer for specific object.
        registry.observationConfig().observationHandler(handler);

        var properties = kafkaProperties.buildProducerProperties(new DefaultSslBundleRegistry());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));

        var senderOptions = SenderOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaSenderObservation.DefaultKafkaSenderObservationConvention());
        return new ReactiveKafkaProducerTemplate<>(senderOptions);
    }

    @Bean(name = "otpSendReactiveKafkaConsumer")
    public ReactiveKafkaConsumerTemplate<String, OtpSendRequestTransferObject> otpSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
        registry.observationConfig()
                .observationHandler(handler);
        return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getOtp().getTopic()), registry));
    }


    @Bean(name = "smsSendReactiveKafkaConsumer")
    public ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer(ObservationRegistry registry, PropagatingReceiverTracingObservationHandler<?> handler) {
        registry.observationConfig()
                .observationHandler(handler);
        return new ReactiveKafkaConsumerTemplate<>(createReceiverOptions(List.of(topicProperties.getSms().getTopic()), registry));
    }

    private <K, V> ReceiverOptions<K, V> createReceiverOptions(List<String> topics, ObservationRegistry registry) {

        var properties = kafkaProperties.buildConsumerProperties(new DefaultSslBundleRegistry());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.join(",", kafkaProperties.getBootstrapServers()));

        return ReceiverOptions.<K, V>create(properties)
                .withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention())
                .subscription(topics);

    }

}

Кафка Продюсер

@Service
@Slf4j
@RequiredArgsConstructor
public class SendSMSReactiveKafkaProducer {
    private final KafkaTopicProperties kafkaTopicProperties;
    private final ReactiveKafkaProducerTemplate<String, SmsSendRequestTransferObject> kafkaProducerTemplate;

    public Mono<CommonResult> produce(SmsSendRequest request) {
        var topic = kafkaTopicProperties.getSms().getTopic();

        var kafkaHeaders = new RecordHeaders();
        var message = SmsSendRequestTransferObject.builder()
                .content(request.getContent())
                .receiver(request.getReceiver())
                .build();
        var record = new ProducerRecord<>(topic, null, UUID.randomUUID().toString(), message, kafkaHeaders);

        return kafkaProducerTemplate.send(record)
                .doOnError(ex -> log.error("Failed to send record = {} to topic = {} failed.", record, topic, ex))
                .doOnSuccess(result -> log.debug("Record is successfully sent to topic = {}, metadata = {}", topic, result.recordMetadata()))
                .map(result -> {
                    var hasResult = HasResult.SUCCESS;
                    if (result.exception() != null) {
                        hasResult = HasResult.UNKNOWN_ERROR;
                    }
                    return new CommonResult(hasResult);
                });
    }
}

Кафка Потребитель


@Service
@Slf4j
public class SendSMSReactiveKafkaConsumer extends AbstractBaseReactiveKafkaConsumer<String, SmsSendRequestTransferObject> {
    private final RetryBackoffSpec retryBackoffSpec;
    private final SmsMessageSender smsMessageSender;
    private final ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer;

    public SendSMSReactiveKafkaConsumer(ReactiveKafkaConsumerTemplate<String, SmsSendRequestTransferObject> smsSendReactiveKafkaConsumer,
                                        ReactiveKafkaProducerTemplate<String, ErrorRecord<SmsSendRequestTransferObject>> dlqProducerTemplate,
                                        SmsMessageSender smsMessageSender, KafkaTopicProperties topicProperties,
                                        SmsSettingProperties smsSettingProperties) {
        super(topicProperties.getSms().getDlq(), dlqProducerTemplate);
        var params = smsSettingProperties.getRetryBackOffSpec().getKafka();
        this.smsMessageSender = smsMessageSender;
        this.smsSendReactiveKafkaConsumer = smsSendReactiveKafkaConsumer;
        this.retryBackoffSpec = RetryBackoffSpecUtil.createRetryBackoffSpec(params.getMaxAttempts(), params.getBackOffPeriod());
    }

    @PostConstruct
    public void init() {
        consume()
                .onErrorContinue((throwable, o) -> log.error("Error while initializing Kafka consumer.", throwable))
                .subscribe();
    }

    @Override
    public Flux<Void> consume() {
        return smsSendReactiveKafkaConsumer.receiveAtMostOnce() // Be careful to use receive types. receiveAtMostOnce - receives message and commits immediately , if failure occurs message will not be redelivered, but it guarantees message commit.
                .onErrorResume(Throwable.class, ex -> {
                    log.error("Exception on receiving message from Kafka.", ex);
                    return Mono.empty();  // Continue processing the next message
                })
                .flatMap(message -> {
                            return Mono.just(message)
                                    .doOnNext(this::log)
                                    .flatMap(record -> smsMessageSender.send(record.value(), retryBackoffSpec))
                                    .then()
                                    .onErrorResume(Throwable.class, ex -> {
                                        log.error("Error during message processing.", ex);
                                        return sendToDLQ(message, ex); // Send message to DLQ topic
                                    });
                        }
                )
                .doOnTerminate(() -> log.error("The subscription was terminated. Either it was cancelled or completed successfully."))
                .subscribeOn(Schedulers.boundedElastic());
    }


}

Настроил файл application.yaml следующим образом:

logging:
  pattern:
    level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]"

Он работает хорошо, но когда я публикую запрос в теме Kafka, я теряю текущие TraceId и SpaId.

Я также настроил WebFlux, например Hooks.enableAutomaticContextPropagation(); При получении запроса от конечной точки трассировка работает и генерируются TraceId и SpaId. В разных контекстах он также сохраняет текущий след.

Проблема в том, что я потерял свои TraceId и SpaId после публикации сообщения в Kafka. Как настроить реактивное распространение контекста для Kafka.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
1
0
249
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вам необходимо явно настроить наблюдение для SenderOptions. Spring Boot не настраивается автоматически, поскольку для Reactor Kafka просто нет автоматической настройки.

Дополнительную информацию смотрите в документации: https://projectreactor.io/docs/kafka/release/reference/

И найдите 5.5. Micrometer Observation.

И вы также должны использовать зависимость io.micrometer:micrometer-observation.

Я отредактировал тело вопроса. Я уже настроил SenderOptions и ReceiverOptions. Где я делаю ошибку?

Nasibulloh Yandashev 19.04.2024 14:32

Вы, вероятно, говорите об отслеживании на стороне потребителя, поэтому вам необходимо следовать рекомендациям этого документа: KafkaReceiverObservation.RECEIVER_OBSERVATION API must be used manually in the record processing operator:. Это .withObservation(registry, new KafkaReceiverObservation.DefaultKafkaReceiverObservationConv‌​ention()) очень ограничено только для потребительских внутренних устройств. Пользовательский код не участвует в этом наблюдении из-за характера распространения контекста Reactor.

Artem Bilan 19.04.2024 15:31

Дополнительную информацию смотрите в моем примере: github.com/artembilan/sandbox/tree/master/…

Artem Bilan 19.04.2024 15:33
github.com/artembilan/sandbox/tree/master/… Действительно помогло, видел решение в документации, но не пробовал, не обращал внимания.
Nasibulloh Yandashev 20.04.2024 11:35

Другие вопросы по теме