Я пытаюсь разработать структуру конвейера поверх Reactor.
На каждом этапе (не считая первого и последнего) у нас есть задачи, которые преобразуют объект (то есть строку в ее длину или URL-адрес в ее HTML-контент и т. д.). Вот пример:
Вы можете видеть, что средний слой имеет 3 задачи, и каждая задача преобразует объект X в объект Y (кстати, это всегда полностью связанные слои).
Мой вопрос/дилемма:
Моей первой мыслью было, что все, что мне нужно, это Flux.merge(), а затем подключить его к каждому подписчику. Например:
Flux<X> source = Flux.merge(x1Flux, x2Flux)
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)
Другой вариант — поставить процессор (TopicProcessor?), который будет действовать как промежуточное ПО (как в шаблоне pub-sub).
Мне не хватает понимания того, какое решение лучше всего подходит для моей проблемы. Логически это то же самое, но каковы практические последствия каждой архитектуры?
Спасибо!




Мой общий подход здесь состоит в том, чтобы использовать ConnectableFlux, чтобы отложить публикацию до тех пор, пока у вас не будет настроен весь конвейер, а затем вызывать connect() для каждого потока после того, как вы настроили конвейер.
Вы, мог, используете процессор, но я бы посоветовал по возможности избегать этого.
Общий смысл (не проверенный на синтаксис) будет примерно таким:
ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();
ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();
x1.connect();
x2.connect();
y1.connect();
//...etc.
Также обратите внимание, что вы можете использовать concat() или mergeSequential(), а не merge(), в зависимости от вашего варианта использования (merge() будет охотно подписываться на издателей, concat() — нет, а mergeSequential() будет объединяться в порядке получения, потенциально чередуя значения).
@yaseco Верно. concat() будет ждать завершения первого потока, прежде чем перейти к следующему, поэтому в случае бесконечных потоков он никогда не завершится.
эта часть Flux.from(...::subscribe) кажется подозрительной (обычно это запах кода, чтобы подписаться в середине такой реактивной цепочки). Почему вы включили его вместо простого Flux.merge(x1, x2).publish())?
Спасибо большое Михаил! На самом деле, это должен быть неопределенный процесс, поэтому я думаю, что нужно
mergeилиmergeSequential, верно?