Использование WebClient для распространения заголовков запросов, полученных в приложениях Spring Webflux, на нижестоящие службы

У меня есть два типа приложений Webflux: на основе аннотаций и на основе маршрутов. Эти приложения вызываются с набором заголовков, некоторые из которых (Open Tracing) мне нужно распространять в нисходящих вызовах с помощью WebClient.

Если бы это были обычные приложения Spring WebMvc, я бы использовал фильтр, чтобы сохранить выбранные заголовки в ThreadLocal, получить к ним доступ в перехватчике RestTemplate, чтобы отправить их последующим службам, и очистить ThreadLocal.

Как правильно воспроизвести это поведение в приложениях WebFlux?

6
0
5 373
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я решил это, используя контекст Project Reactor для хранения заголовков в WebFilter. Затем они получаются в ExchangeFilterFunction WebClient. Вот полное решение:

Веб-фильтр

class OpenTracingFilter(private val openTracingHeaders: Set<String>) : WebFilter {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {

        return chain.filter(exchange)
                .subscriberContext { ctx ->
                    var updatedContext = ctx
                    exchange.request.headers.forEach {
                        if (openTracingHeaders.contains(it.key.toLowerCase())) {
                            logger.debug("Found OpenTracing Header - key {} - value {}", it.key, it.value[0])
                            updatedContext = updatedContext.put(it.key, it.value[0])
                        }
                    }
                    updatedContext
                }
    }
}

OpenTracingExchangeFilterFunction

class OpenTracingExchangeFilterFunction(private val headers: Set<String>) : ExchangeFilterFunction {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {

        logger.debug("OpenTracingExchangeFilterFunction - filter()")
        return OpenTracingClientResponseMono(request, next, headers)
    }
}

Опентракингклиентреспонсемоно

class OpenTracingClientResponseMono(private val request: ClientRequest,
                                    private val next: ExchangeFunction,
                                    private val headersToPropagate: Set<String>) : Mono<ClientResponse>() {

    private val logger = LoggerFactory.getLogger(javaClass)

    override fun subscribe(subscriber: CoreSubscriber<in ClientResponse>) {
        val context = subscriber.currentContext()

        val requestBuilder = ClientRequest.from(request)
        requestBuilder.headers { httpHeaders ->
            headersToPropagate.forEach {
                if (context.hasKey(it)) {
                    logger.debug("Propagating header key {} - value{}", it, context.get<String>(it))
                    httpHeaders[it] = context.get<String>(it)
                }
            }
        }
        val mutatedRequest = requestBuilder.build()
        next.exchange(mutatedRequest).subscribe(subscriber)
    }


}

Конфигурация OpenTracing

@Configuration
class OpenTracingConfiguration(private val openTracingConfigurationProperties: OpenTracingConfigurationProperties) {

    @Bean
    fun webClient(): WebClient {
        return WebClient.builder().filter(openTracingExchangeFilterFunction()).build()
    }

    @Bean
    fun openTracingFilter(): WebFilter {
        return OpenTracingFilter(openTracingConfigurationProperties.headers)
    }

    @Bean
    fun openTracingExchangeFilterFunction(): OpenTracingExchangeFilterFunction {
        return OpenTracingExchangeFilterFunction(openTracingConfigurationProperties.headers)
    }
}

OpenTracingConfigurationProperties

@Configuration
@ConfigurationProperties("opentracing")
class OpenTracingConfigurationProperties {

    lateinit var headers: Set<String>

}

приложение.yml

opentracing:
  headers:
    - x-request-id
    - x-b3-traceid
    - x-b3-spanid
    - x-b3-parentspanid
    - x-b3-sampled
    - x-b3-flags
    - x-ot-span-context

Привет. Спасибо за ответ, но я не понимаю класс XxxMono, что он реализует?

WesternGun 23.09.2020 18:14

Какой класс XxxMono?

codependent 23.09.2020 18:17

Извините, я имею в виду OpenTracingClientResponseMono. Какие методы он должен реализовать, чтобы получить доступ к контексту?

WesternGun 23.09.2020 18:18

Целью этого класса является доступ к текущему контексту, чтобы найти заголовки из исходного запроса и распространить их, изменяя запрос WebClient.

codependent 23.09.2020 18:22

Он должен расширять Mono<ClientResponse>() и реализовывать subscribe(subscriber: CoreSubscriber<in ClientResponse>) -> projectreactor.io/docs/core/release/api/reactor/core/publish‌​er/…

codependent 23.09.2020 18:23

Спасибо! Теперь я понимаю.

WesternGun 24.09.2020 12:24

Другие вопросы по теме