У меня есть два типа приложений Webflux: на основе аннотаций и на основе маршрутов. Эти приложения вызываются с набором заголовков, некоторые из которых (Open Tracing) мне нужно распространять в нисходящих вызовах с помощью WebClient
.
Если бы это были обычные приложения Spring WebMvc, я бы использовал фильтр, чтобы сохранить выбранные заголовки в ThreadLocal
, получить к ним доступ в перехватчике RestTemplate
, чтобы отправить их последующим службам, и очистить ThreadLocal
.
Как правильно воспроизвести это поведение в приложениях WebFlux?
Я решил это, используя контекст Project Reactor для хранения заголовков в WebFilter. Затем они получаются в ExchangeFilterFunction WebClient. Вот полное решение:
Веб-фильтр
class OpenTracingFilter(private val openTracingHeaders: Set<String>) : WebFilter {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return chain.filter(exchange)
.subscriberContext { ctx ->
var updatedContext = ctx
exchange.request.headers.forEach {
if (openTracingHeaders.contains(it.key.toLowerCase())) {
logger.debug("Found OpenTracing Header - key {} - value {}", it.key, it.value[0])
updatedContext = updatedContext.put(it.key, it.value[0])
}
}
updatedContext
}
}
}
OpenTracingExchangeFilterFunction
class OpenTracingExchangeFilterFunction(private val headers: Set<String>) : ExchangeFilterFunction {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
logger.debug("OpenTracingExchangeFilterFunction - filter()")
return OpenTracingClientResponseMono(request, next, headers)
}
}
Опентракингклиентреспонсемоно
class OpenTracingClientResponseMono(private val request: ClientRequest,
private val next: ExchangeFunction,
private val headersToPropagate: Set<String>) : Mono<ClientResponse>() {
private val logger = LoggerFactory.getLogger(javaClass)
override fun subscribe(subscriber: CoreSubscriber<in ClientResponse>) {
val context = subscriber.currentContext()
val requestBuilder = ClientRequest.from(request)
requestBuilder.headers { httpHeaders ->
headersToPropagate.forEach {
if (context.hasKey(it)) {
logger.debug("Propagating header key {} - value{}", it, context.get<String>(it))
httpHeaders[it] = context.get<String>(it)
}
}
}
val mutatedRequest = requestBuilder.build()
next.exchange(mutatedRequest).subscribe(subscriber)
}
}
Конфигурация OpenTracing
@Configuration
class OpenTracingConfiguration(private val openTracingConfigurationProperties: OpenTracingConfigurationProperties) {
@Bean
fun webClient(): WebClient {
return WebClient.builder().filter(openTracingExchangeFilterFunction()).build()
}
@Bean
fun openTracingFilter(): WebFilter {
return OpenTracingFilter(openTracingConfigurationProperties.headers)
}
@Bean
fun openTracingExchangeFilterFunction(): OpenTracingExchangeFilterFunction {
return OpenTracingExchangeFilterFunction(openTracingConfigurationProperties.headers)
}
}
OpenTracingConfigurationProperties
@Configuration
@ConfigurationProperties("opentracing")
class OpenTracingConfigurationProperties {
lateinit var headers: Set<String>
}
приложение.yml
opentracing:
headers:
- x-request-id
- x-b3-traceid
- x-b3-spanid
- x-b3-parentspanid
- x-b3-sampled
- x-b3-flags
- x-ot-span-context
Какой класс XxxMono?
Извините, я имею в виду OpenTracingClientResponseMono
. Какие методы он должен реализовать, чтобы получить доступ к контексту?
Целью этого класса является доступ к текущему контексту, чтобы найти заголовки из исходного запроса и распространить их, изменяя запрос WebClient.
Он должен расширять Mono<ClientResponse>()
и реализовывать subscribe(subscriber: CoreSubscriber<in ClientResponse>)
-> projectreactor.io/docs/core/release/api/reactor/core/publisher/…
Спасибо! Теперь я понимаю.
Привет. Спасибо за ответ, но я не понимаю класс
XxxMono
, что он реализует?