Используйте PriorityBlockingQueue для создания шаблона производитель-потребитель в Java Reactor

В моем проекте есть планировщик Spring, который периодически сканирует задачи «TO BE DONE» из БД, а затем распределяет их по потребителю задач для последующей обработки. Итак, текущая реализация заключается в создании Reactor Sinks между производителем и потребителем.

Sinks.Many<Task> taskSink = Sinks.many().multicast().onBackpressureBuffer(1000, false);

Режиссер:

Flux<Date> dates = loadDates();
dates.filterWhen(...)
     .concatMap(date -> taskManager.getTaskByDate(date))
     .doOnNext(taskSink::tryEmitNext)
     .subscribe();

Потребитель:

taskProcessor.process(taskSink.asFlux())
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();

Используя Sink, он отлично работает в большинстве случаев. Но когда система находится под большой нагрузкой, специалист по обслуживанию системы захочет знать:

  1. Сколько задач все еще находится в раковине?
  2. Если возможно очистить все задачи в Sink.
  3. Если можно расставить приоритеты задач в Sink.

К сожалению, Sink не может удовлетворить все потребности, упомянутые выше. Итак, я создал класс-оболочку, который включает в себя Map и PriorityBlockingQueue. Я повторил реализацию по этой ссылке https://stackoverflow.com/a/71009712/19278017.

После этого исходный код производителя-потребителя был изменен следующим образом:

Очередь задач:

MergingQueue<Task> taskQueue = new PriorityMergingQueue();

Режиссер:

Flux<Date> dates = loadDates();
dates.filterWhen(...)
     .concatMap(date -> taskManager.getTaskByDate(date))
     .doOnNext(taskQueue::enqueue)
     .subscribe();

Потребитель:

taskProcessor.process(Flux.create((sink) -> {
     sink.onRequest(n -> {
          Task task;
          try {
                while(!sink.isCancel() && n > 0) {
                    if (task = taskQueue.poll(1, TimeUnit.SECOND)  != null) {
                    sink.next(task);
                    n--;
                }
          } catch() {
                ....
          })
          .subscribeOn(Schedulers.boundedElastic())
          .subscribe();

У меня есть несколько вопросов, как показано ниже:

  • Будет ли это проблемой для кода, выполняющего .poll()? Так как я столкнулся с проблемой зависания потока во время тестирования долговечности. Просто не уверен, что это связано с вызовом poll().
  • Есть ли альтернативное решение в Reactor, которое работает как PriorityBlockingQueue?
LeetCode запись решения 2536. Увеличение подматриц на единицу
LeetCode запись решения 2536. Увеличение подматриц на единицу
Увеличение подматриц на единицу - LeetCode
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
Как включить TLS в gRPC-клиенте и сервере : 2
Как включить TLS в gRPC-клиенте и сервере : 2
Здравствуйте! 🙏🏻 Надеюсь, у вас все хорошо и добро пожаловать в мой блог.
Сортировка hashmap по значениям
Сортировка hashmap по значениям
На Leetcode я решал задачу с хэшмапой и подумал, что мне нужно отсортировать хэшмапу по значениям.
1
0
69
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Цель реактивного программирования — избежать блокирующих операций. PriorityBlockingQueue.poll() вызовет проблемы, так как заблокирует поток в ожидании следующего элемента.

Однако в Reactor есть альтернативное решение: одноадресная версия Sinks.Many позволяет использовать произвольную очередь для буферизации с помощью Sinks.many().unicast().onBackPressureBuffer(Queue<T>). Используя экземпляр PriorityQueue вне Sink, вы можете выполнить все три требования.

Вот короткая демонстрация, в которой я запускаю задачу каждые 100 мс:

public record Task(int prio) {}

private static void log(Object message) {
    System.out.println(LocalTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MILLIS) + ": " + message);
}

public void externalBufferDemo() throws InterruptedException {
    Queue<Task> taskQueue = new PriorityQueue<>(Comparator.comparingInt(Task::prio).reversed());
    Sinks.Many<Task> taskSink = Sinks.many().unicast().onBackpressureBuffer(taskQueue);

    taskSink.asFlux()
            .delayElements(Duration.ofMillis(100))
            .subscribe(task -> log(task));

    for (int i = 0; i < 10; i++) {
        taskSink.tryEmitNext(new Task(i));
    }
    // Show amount of tasks sitting in the Sink:
    log("Nr of tasks in sink: " + taskQueue.size());

    // Clear all tasks in the sink after 350ms:
    Thread.sleep(350);
    taskQueue.clear();
    log("Nr of tasks after clear: " + taskQueue.size());

    Thread.sleep(1500);
}

Вывод:

09:41:11.347: Nr of tasks in sink: 9
09:41:11.450: Task[prio=0]
09:41:11.577: Task[prio=9]
09:41:11.687: Task[prio=8]
09:41:11.705: Nr of tasks after clear: 0
09:41:11.799: Task[prio=7]

Обратите внимание, что delayElements имеет внутреннюю очередь размером 1, поэтому задача 0 была выбрана до того, как была запущена задача 1, и почему задача 7 была выбрана после очистки.

Если требуется многоадресная рассылка, вы можете преобразовать свой поток, используя один из многих операторов, обеспечивающих многоадресную рассылку.

Другие вопросы по теме