Spring boot + webflux: контекст теряется при параллельном выполнении некоторых шагов

Весенняя загрузка: 2.1.3.RELEASE

Привет,

Я пытаюсь использовать функцию контекста Spring WebFlux для переноса простой переменной. У меня есть WebFilter, устанавливающий контекст с такой переменной, и я пытаюсь использовать его в своем контроллере на разных этапах моего потока/потока. В какой-то момент я теряю его после вызова метода «parallel()» класса Flux.

  • Фильтр:
public class TestFilter implements WebFilter {

    private Logger LOG = LoggerFactory.getLogger(TestFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain.filter(exchange)
            .doOnEach(voidSignal -> System.out.println("filter:"+voidSignal.getContext().getOrEmpty("blob"))).subscriberContext(Context.of("blob", "kapoue"));
    }

}
  • Контроллер:
@RestController
@RequestMapping(TestControllerWebFlux.ROOT)
public class TestControllerWebFlux {

    static final String ROOT = "/flux";
    static final String TEST = "/test";

    private WebClient webClient = WebClient.create();

    @GetMapping(
            value = TEST,
            produces = {MediaType.APPLICATION_JSON_VALUE})
    public Mono<String> test() {
        System.out.println("controller1:"+Thread.currentThread());

        Flux<String> call = webClient.get().uri("http://localhost:" + 8080 + ROOT + "/test2").retrieve().bodyToFlux(Result.class).map(Result::getValue);

        return call.map(s -> s+"0")
            .doOnEach(stringSignal -> System.out.println("controller2:"+stringSignal.getContext().getOrEmpty("blob")))
            .parallel()
            .doOnEach(stringSignal -> System.out.println("controller3:"+stringSignal.getContext().getOrEmpty("blob")))
            .map(s -> s+"0")
            .doOnEach(stringSignal -> System.out.println("controller4:"+stringSignal.getContext().getOrEmpty("blob")))
            .reduce((s, s2) -> s+s2)
            .doOnEach(stringSignal -> System.out.println("controller5:"+stringSignal.getContext().getOrEmpty("blob")))
            .map(s -> {
                System.out.println("controller6:"+Thread.currentThread());
                return s;
            });
    }

    @GetMapping(
        value = "test2",
        produces = {MediaType.APPLICATION_JSON_VALUE})
    public Flux<Result> test2() {
        return Flux.just(new Result("0"), new Result("0"), new Result("0"));
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Result {
        private String value;
    }
}

Все, что я делаю, это вызываю конечную точку http://локальный:8080/флюс/тест/, и я получаю это:

controller1:Thread[reactor-http-nio-2,5,main] controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller2:Optional[kapoue] controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller3:Optional.empty controller4:Optional.empty controller5:Optional[kapoue] controller6:Thread[reactor-http-nio-2,5,main] filter:Optional[kapoue]

Как видите, контекст теряется сразу после «параллельного» метода и каким-то образом возвращается после редукции.

Это ошибка или я не должен пытаться запускать вещи параллельно после вызова, такого как этот?

Заранее спасибо за помощь.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
0
1 129
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это похоже на ошибку в Reactor. Я сообщил об этом: https://github.com/reactor/reactor-core/issues/1656

... и исправлено в github.com/reactor/reactor-core/pull/1657. Контекст не был потерян, просто .doOnEach не устанавливался на Signal объекты, которые вы получаете.

bsideup 11.04.2019 11:03

Другие вопросы по теме