Проблема со следующим кодом заключается в том, что подписчик видит только элементы первого потока (т.е. только печать 1). Интересно, если я добавлю delayElements, он отлично работает.
Это игрушечный пример, но я намерен заменить его на Flux, которые делают запросы HTTP GET и выдают их результаты (также их может быть больше двух).
Итак, переформулируя мой вопрос, у меня есть отношение много к одному, которое необходимо реализовать. Как это реализовать, учитывая мой случай? Не могли бы вы использовать какой-то процессор?
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
});
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
Попытка достичь той же идеи с помощью TopicProcessor, но она страдает от той же проблемы:
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
TopicProcessor<Integer> processor = TopicProcessor.create();
flux1.subscribe(processor);
flux2.subscribe(processor);
processor.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}




Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
Вы создаете бесконечный источник здесь без специального планировщика, поэтому он пытается полностью опустошить этот источник перед слиянием - и поэтому у вас есть проблема.
Это может не быть проблемой в вашем реальном случае использования, поскольку результат запроса GET, по-видимому, не будет бесконечным. Однако, если вы хотите убедиться, что результаты чередуются независимо, вам просто нужно убедиться, что вы настроили каждый поток с собственным планировщиком (вызвав subscribeOn(Schedulers.elastic()); для каждого потока).
Итак, ваш пример становится:
Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
.subscribeOn(Schedulers.elastic());
Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
.subscribeOn(Schedulers.elastic());
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
"Поток.currentThread().join();" предполагает, что какой-то другой поток прервет текущий поток. Что это еще за нить?