Я некоторое время играл с реактором, но мне все еще нужно что-то получить.
Этот фрагмент кода
Flux.range(1, 1000)
.delayElements(Duration.ofNanos(1))
.map(integer -> integer + 1)
.subscribe(System.out::println);
System.out.println("after");
Возвращает:
after
2
3
4
что ожидается, поскольку документация по подписке гласит: this will immediately return control to the calling thread.
Зачем тогда этот кусок кода:
Flux.range(1, 1000)
.map(integer -> integer + 1)
.subscribe(System.out::println);
возвращается
1
2
...
1000
1001
after
Я никогда не могу понять, когда subscribe
заблокируется или нет, и это очень раздражает при написании пакетов.
Если у кого-то есть ответ, это было бы потрясающе
В вашем фрагменте нет кода блокировки.
В первом примере вы используете .delayElements()
, и он переключает выполнение на другой поток и освобождает ваш основной поток. Таким образом, вы можете сразу увидеть, как ваш System.out.println("after");
выполняется в основном потоке, в то время как реактивная цепочка выполняется в parallel-n
потоках.
Ваш первый пример:
18:49:29.195 [main] INFO com.example.demo.FluxTest - AFTER
18:49:29.199 [parallel-1] INFO com.example.demo.FluxTest - v: 2
18:49:29.201 [parallel-2] INFO com.example.demo.FluxTest - v: 3
18:49:29.202 [parallel-3] INFO com.example.demo.FluxTest - v: 4
18:49:29.203 [parallel-4] INFO com.example.demo.FluxTest - v: 5
18:49:29.205 [parallel-5] INFO com.example.demo.FluxTest - v: 6
Но ваш второй пример не переключает исполняемый поток, поэтому ваша реактивная цепочка выполняется в основном потоке. И после его завершения он продолжает выполнять ваш System.out.println("after");
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 995
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 996
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 997
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 998
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 999
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1000
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1001
18:51:28.491 [main] INFO com.example.demo.FluxTest - AFTER
Обновлено: Если вы хотите переключить поток во втором фрагменте, у вас есть два варианта:
Добавьте subscribeOn(<Scheduler>)
в любом месте вашей реактивной цепочки. Затем весь процесс подписки будет происходить в потоке из предоставленного вами планировщика.
Добавьте publishOn(<Scheduler>)
, например, после Flux.range()
, тогда само излучение будет происходить в вашем вызывающем потоке, но нисходящий поток будет выполняться в потоке из предоставленного вами планировщика
вы не можете вернуться к основному потоку в первом фрагменте, именно так работает delayElements. Обновлен ответ о переключении потоков во втором фрагменте.
Спасибо за ответ! А знаете ли вы, как я могу сделать так, чтобы первый сниппет выполнялся в основном потоке? с
suscribeOn()
может быть?