У меня есть блокирующая рабочая нагрузка, которую я хочу выполнить в ограниченном эластичном планировщике. После того, как эта работа будет выполнена, следует много работы, которую можно было бы выполнить в параллельном планировщике, но она будет автоматически продолжать выполняться в потоке из планировщика с ограниченной эластичностью.
Когда «правильно» отказаться от предыдущего планировщика, который вы установили ранее в цепочке? Есть ли когда-либо причина делать это, если это не является строго необходимым, например, из-за голодания потока?
Я могу переключить планировщик цепочки, «разорвав» существующую цепочку с помощью flatMap
, then
, switchIfEmpty
и, возможно, еще нескольких методов. Пример:
public Mono<Void> mulpleSchedulers() {
return blockingWorkload()
.doOnSuccess(result -> log.info("Thread {}", Thread.currentThread().getName())) // Thread boundedElastic-1
.subscribeOn(Schedulers.boundedElastic())
.flatMap(result -> Mono.just(result)
.subscribeOn(Schedulers.parallel()))
.doOnSuccess(result -> log.info("Thread {}", Thread.currentThread().getName())); // Thread parallel-1
}
Я не знал, что это альтернатива. Ясно, что это гораздо более эргономичный подход, чем мой пример. Мой вопрос был отчасти вызван тем, как странно было «разорвать» цепочку только для того, чтобы изменить рабочий поток, но поскольку publishOn явно предназначен для достижения того же самого, я думаю, у меня нет причин не просто переключиться на самый соответствующий планировщик.
конечно. Неплохая практика переключать расписания по всей вашей реактивной цепочке, но вы должны четко знать причины этого.
Как правило, неплохо переключить выполнение на другой планировщик по всей вашей реактивной цепочке.
Чтобы переключить выполнение на другой планировщик в середине вашей цепочки, вы можете использовать оператор publishOn()
. Затем любой последующий вызов оператора будет выполняться на поставленном планировщике для такого publishOn()
.
Однако вы должны четко знать, зачем вы это делаете. Есть ли для этого причина?
Если вы хотите запустить какой-то длительный вычислительный процесс (некоторая работа, связанная с процессором), то рекомендуется выполнить его на Schedulers.parallel()
Для блокировки звонков рекомендуется использовать Schedulers.boundedElastic()
Однако обычно для блокировки вызовов мы используем subscribeOn(Schedulers.boundedElastic())
непосредственно для «блокирующего издателя».
Оборачиваем блокирующие звонки в реактор
почему бы просто не использовать publishOn()?