Реактор Java «подписаться» иногда блокируется, иногда нет

Я некоторое время играл с реактором, но мне все еще нужно что-то получить.

Этот фрагмент кода

    Flux.range(1, 1000)
        .delayElements(Duration.ofNanos(1))
        .map(integer -> integer + 1)
        .subscribe(System.out::println);
    System.out.println("after");

Возвращает:

after
2
3
4

что ожидается, поскольку документация по подписке гласит: this will immediately return control to the calling thread.

Зачем тогда этот кусок кода:

    Flux.range(1, 1000)
        .map(integer -> integer + 1)
        .subscribe(System.out::println);

возвращается

1
2
...
1000
1001
after

Я никогда не могу понять, когда subscribe заблокируется или нет, и это очень раздражает при написании пакетов.

Если у кого-то есть ответ, это было бы потрясающе

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
183
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В вашем фрагменте нет кода блокировки.

В первом примере вы используете .delayElements(), и он переключает выполнение на другой поток и освобождает ваш основной поток. Таким образом, вы можете сразу увидеть, как ваш System.out.println("after"); выполняется в основном потоке, в то время как реактивная цепочка выполняется в parallel-n потоках.

Ваш первый пример:

18:49:29.195 [main] INFO com.example.demo.FluxTest - AFTER
18:49:29.199 [parallel-1] INFO com.example.demo.FluxTest - v: 2
18:49:29.201 [parallel-2] INFO com.example.demo.FluxTest - v: 3
18:49:29.202 [parallel-3] INFO com.example.demo.FluxTest - v: 4
18:49:29.203 [parallel-4] INFO com.example.demo.FluxTest - v: 5
18:49:29.205 [parallel-5] INFO com.example.demo.FluxTest - v: 6

Но ваш второй пример не переключает исполняемый поток, поэтому ваша реактивная цепочка выполняется в основном потоке. И после его завершения он продолжает выполнять ваш System.out.println("after");

18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 995
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 996
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 997
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 998
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 999
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1000
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1001
18:51:28.491 [main] INFO com.example.demo.FluxTest - AFTER

Обновлено: Если вы хотите переключить поток во втором фрагменте, у вас есть два варианта:

  1. Добавьте subscribeOn(<Scheduler>) в любом месте вашей реактивной цепочки. Затем весь процесс подписки будет происходить в потоке из предоставленного вами планировщика.

  2. Добавьте publishOn(<Scheduler>), например, после Flux.range(), тогда само излучение будет происходить в вашем вызывающем потоке, но нисходящий поток будет выполняться в потоке из предоставленного вами планировщика

Спасибо за ответ! А знаете ли вы, как я могу сделать так, чтобы первый сниппет выполнялся в основном потоке? с suscribeOn() может быть?

vandaele mathias 10.01.2023 16:09

вы не можете вернуться к основному потоку в первом фрагменте, именно так работает delayElements. Обновлен ответ о переключении потоков во втором фрагменте.

kerbermeister 10.01.2023 16:21

Другие вопросы по теме