Трассировка в Spring Integration (Spring boot 3.0.9)

Я пытаюсь использовать интеграцию Spring: получаю сообщение через JMS и отправляю его в Кафку:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
                .log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
                .channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
    }

Для проверок я отправляю сообщения в MQ, используя запрос POST + JmsTemplate:

    @PostMapping("/mq")
    public String sentToMq(@RequestBody final String body) {
        jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
            final var span = tracer.startScopedSpan("jms-send");
            try {
                final var context = span.context();
                m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
            } finally {
                span.end();
            }

            return m;
        });
        return "done";
    }

Все работает нормально, кроме трассировки. Я устанавливаю заголовок b3 вручную перед отправкой, но после получения Spring Integration отменяет его. Что мне следует сделать, чтобы сохранить входящий идентификатор трассировки?

Также у меня есть следующая конфигурация:

  • @EnableIntegrationManagement(observationPatterns = "*")
  •   @Bean
      @GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
      public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
          return new ObservationPropagationChannelInterceptor(observationRegistry);
      }
    

Есть ли у вас шанс получить от вас простой проект, который можно воспроизвести и поиграть? Спасибо

Artem Bilan 14.09.2023 21:02
1
1
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

ХОРОШО. Я знаю, в чем проблема. JmsMessageDrivenEndpoint не передает свой registerObservationRegistry(ObservationRegistry) до ChannelPublishingJmsMessageListener, поэтому IntegrationObservation.HANDLER не восстанавливает след из этого b3 заголовка.

Наблюдение за MessageChannel консультируется только с текущим контекстом и действительно заполняет новый заголовок b3 в соответствии с этим текущим контекстом.

Вместо этого рассмотрите возможность использования Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer) и AbstractMessageListenerContainer.setObservationRegistry(), чтобы включить JmsObservationDocumentation.JMS_MESSAGE_PROCESS отслеживание потребителей.

Для JmsMessageDrivenEndpoint, пожалуйста, поднимите проблему GH в Spring Integration.

Вот исправление, о котором я упоминал ранее, которое необходимо было сделать на стороне Spring Integration: github.com/spring-projects/spring-integration/pull/8740

Artem Bilan 18.09.2023 22:16

Другие вопросы по теме

Переконфигурируйте компоненты Spring Integration во время выполнения
Как настроить TransactionInterceptor, эквивалентный аннотации @Transactional, для использования с советом PollerMetadata
Как FileLock работает в сети? Будет ли он блокировать только локальный файл в монтировании или блокировка «распространится» на файл на другом компьютере?
Spring-Integration: как проверить ответ http в AbstractRequestHandlerAdvice с помощью webflux?
Spring Integration: определите последовательную обработку сообщений с помощью Java DSL
Как исправить «весенняя интеграция - это односторонний« MessageHandler », и не следует настраивать исключение «outputChannel»»?
Как я могу правильно управлять потоком в subFlowMapping
Как приостановить исходный класс потока облачных данных Spring от отправки данных в kafka?
Как вызвать операции исходящего шлюза SFTP в конфигурации из компонента в Spring
Запуск потока интеграции Spring одновременно для каждого файла Ftp