Project Reactor: несколько издателей, выполняющих HTTP-запросы, и один подписчик для обработки всех результатов

Проблема со следующим кодом заключается в том, что подписчик видит только элементы первого потока (т.е. только печать 1). Интересно, если я добавлю delayElements, он отлично работает.

Это игрушечный пример, но я намерен заменить его на Flux, которые делают запросы HTTP GET и выдают их результаты (также их может быть больше двух).

Итак, переформулируя мой вопрос, у меня есть отношение много к одному, которое необходимо реализовать. Как это реализовать, учитывая мой случай? Не могли бы вы использовать какой-то процессор?

 public static void main(String[] args) throws Exception {
    Flux<Integer> flux1 = Flux.generate(emitter -> {
        emitter.next(1);
    });

    Flux<Integer> flux2 = Flux.generate(emitter -> {
        emitter.next(2);

    });

    Flux<Integer> merged = flux1.mergeWith(flux2);
    merged.subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

Попытка достичь той же идеи с помощью TopicProcessor, но она страдает от той же проблемы:

public static void main(String[] args) throws Exception {
    Flux<Integer> flux1 = Flux.generate(emitter -> {
        emitter.next(1);
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
    });

    Flux<Integer> flux2 = Flux.generate(emitter -> {
        emitter.next(2);
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
    });

    TopicProcessor<Integer> processor = TopicProcessor.create();
    flux1.subscribe(processor);
    flux2.subscribe(processor);

    processor.subscribe(s -> System.out.println(s));


    Thread.currentThread().join();
}

"Поток.currentThread().join();" предполагает, что какой-то другой поток прервет текущий поток. Что это еще за нить?

Alexei Kaigorodov 23.07.2019 13:11
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
1
549
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Из документов:

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

Вы создаете бесконечный источник здесь без специального планировщика, поэтому он пытается полностью опустошить этот источник перед слиянием - и поэтому у вас есть проблема.

Это может не быть проблемой в вашем реальном случае использования, поскольку результат запроса GET, по-видимому, не будет бесконечным. Однако, если вы хотите убедиться, что результаты чередуются независимо, вам просто нужно убедиться, что вы настроили каждый поток с собственным планировщиком (вызвав subscribeOn(Schedulers.elastic()); для каждого потока).

Итак, ваш пример становится:

Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
        .subscribeOn(Schedulers.elastic());

Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
        .subscribeOn(Schedulers.elastic());

Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));

Thread.currentThread().join();

Другие вопросы по теме