Я пытаюсь использовать интеграцию Spring: получаю сообщение через JMS и отправляю его в Кафку:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
.log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
.channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
}
Для проверок я отправляю сообщения в MQ, используя запрос POST + JmsTemplate:
@PostMapping("/mq")
public String sentToMq(@RequestBody final String body) {
jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
final var span = tracer.startScopedSpan("jms-send");
try {
final var context = span.context();
m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
} finally {
span.end();
}
return m;
});
return "done";
}
Все работает нормально, кроме трассировки. Я устанавливаю заголовок b3
вручную перед отправкой, но после получения Spring Integration отменяет его. Что мне следует сделать, чтобы сохранить входящий идентификатор трассировки?
Также у меня есть следующая конфигурация:
@EnableIntegrationManagement(observationPatterns = "*")
@Bean
@GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
return new ObservationPropagationChannelInterceptor(observationRegistry);
}
ХОРОШО. Я знаю, в чем проблема.
JmsMessageDrivenEndpoint
не передает свой registerObservationRegistry(ObservationRegistry)
до ChannelPublishingJmsMessageListener
, поэтому IntegrationObservation.HANDLER
не восстанавливает след из этого b3
заголовка.
Наблюдение за MessageChannel
консультируется только с текущим контекстом и действительно заполняет новый заголовок b3
в соответствии с этим текущим контекстом.
Вместо этого рассмотрите возможность использования Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer)
и AbstractMessageListenerContainer.setObservationRegistry()
, чтобы включить JmsObservationDocumentation.JMS_MESSAGE_PROCESS
отслеживание потребителей.
Для JmsMessageDrivenEndpoint
, пожалуйста, поднимите проблему GH в Spring Integration.
Вот исправление, о котором я упоминал ранее, которое необходимо было сделать на стороне Spring Integration: github.com/spring-projects/spring-integration/pull/8740
Есть ли у вас шанс получить от вас простой проект, который можно воспроизвести и поиграть? Спасибо