В моем проекте есть планировщик Spring, который периодически сканирует задачи «TO BE DONE» из БД, а затем распределяет их по потребителю задач для последующей обработки. Итак, текущая реализация заключается в создании Reactor Sinks между производителем и потребителем.
Sinks.Many<Task> taskSink = Sinks.many().multicast().onBackpressureBuffer(1000, false);
Режиссер:
Flux<Date> dates = loadDates();
dates.filterWhen(...)
.concatMap(date -> taskManager.getTaskByDate(date))
.doOnNext(taskSink::tryEmitNext)
.subscribe();
Потребитель:
taskProcessor.process(taskSink.asFlux())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
Используя Sink, он отлично работает в большинстве случаев. Но когда система находится под большой нагрузкой, специалист по обслуживанию системы захочет знать:
К сожалению, Sink не может удовлетворить все потребности, упомянутые выше. Итак, я создал класс-оболочку, который включает в себя Map и PriorityBlockingQueue. Я повторил реализацию по этой ссылке https://stackoverflow.com/a/71009712/19278017.
После этого исходный код производителя-потребителя был изменен следующим образом:
Очередь задач:
MergingQueue<Task> taskQueue = new PriorityMergingQueue();
Режиссер:
Flux<Date> dates = loadDates();
dates.filterWhen(...)
.concatMap(date -> taskManager.getTaskByDate(date))
.doOnNext(taskQueue::enqueue)
.subscribe();
Потребитель:
taskProcessor.process(Flux.create((sink) -> {
sink.onRequest(n -> {
Task task;
try {
while(!sink.isCancel() && n > 0) {
if (task = taskQueue.poll(1, TimeUnit.SECOND) != null) {
sink.next(task);
n--;
}
} catch() {
....
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
У меня есть несколько вопросов, как показано ниже:
Цель реактивного программирования — избежать блокирующих операций. PriorityBlockingQueue.poll() вызовет проблемы, так как заблокирует поток в ожидании следующего элемента.
Однако в Reactor есть альтернативное решение: одноадресная версия Sinks.Many позволяет использовать произвольную очередь для буферизации с помощью Sinks.many().unicast().onBackPressureBuffer(Queue<T>). Используя экземпляр PriorityQueue вне Sink, вы можете выполнить все три требования.
Вот короткая демонстрация, в которой я запускаю задачу каждые 100 мс:
public record Task(int prio) {}
private static void log(Object message) {
System.out.println(LocalTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MILLIS) + ": " + message);
}
public void externalBufferDemo() throws InterruptedException {
Queue<Task> taskQueue = new PriorityQueue<>(Comparator.comparingInt(Task::prio).reversed());
Sinks.Many<Task> taskSink = Sinks.many().unicast().onBackpressureBuffer(taskQueue);
taskSink.asFlux()
.delayElements(Duration.ofMillis(100))
.subscribe(task -> log(task));
for (int i = 0; i < 10; i++) {
taskSink.tryEmitNext(new Task(i));
}
// Show amount of tasks sitting in the Sink:
log("Nr of tasks in sink: " + taskQueue.size());
// Clear all tasks in the sink after 350ms:
Thread.sleep(350);
taskQueue.clear();
log("Nr of tasks after clear: " + taskQueue.size());
Thread.sleep(1500);
}
Вывод:
09:41:11.347: Nr of tasks in sink: 9
09:41:11.450: Task[prio=0]
09:41:11.577: Task[prio=9]
09:41:11.687: Task[prio=8]
09:41:11.705: Nr of tasks after clear: 0
09:41:11.799: Task[prio=7]
Обратите внимание, что delayElements имеет внутреннюю очередь размером 1, поэтому задача 0 была выбрана до того, как была запущена задача 1, и почему задача 7 была выбрана после очистки.
Если требуется многоадресная рассылка, вы можете преобразовать свой поток, используя один из многих операторов, обеспечивающих многоадресную рассылку.