Project Reactor: нужен ли мне процессор?

Я пытаюсь разработать структуру конвейера поверх Reactor.

На каждом этапе (не считая первого и последнего) у нас есть задачи, которые преобразуют объект (то есть строку в ее длину или URL-адрес в ее HTML-контент и т. д.). Вот пример:

Project Reactor: нужен ли мне процессор?

Вы можете видеть, что средний слой имеет 3 задачи, и каждая задача преобразует объект X в объект Y (кстати, это всегда полностью связанные слои).

Мой вопрос/дилемма: Моей первой мыслью было, что все, что мне нужно, это Flux.merge(), а затем подключить его к каждому подписчику. Например:

Flux<X> source = Flux.merge(x1Flux, x2Flux)  
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)

Другой вариант — поставить процессор (TopicProcessor?), который будет действовать как промежуточное ПО (как в шаблоне pub-sub).

Мне не хватает понимания того, какое решение лучше всего подходит для моей проблемы. Логически это то же самое, но каковы практические последствия каждой архитектуры?

Спасибо!

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
870
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мой общий подход здесь состоит в том, чтобы использовать ConnectableFlux, чтобы отложить публикацию до тех пор, пока у вас не будет настроен весь конвейер, а затем вызывать connect() для каждого потока после того, как вы настроили конвейер.

Вы, мог, используете процессор, но я бы посоветовал по возможности избегать этого.

Общий смысл (не проверенный на синтаксис) будет примерно таким:

ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();

ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();

ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();

x1.connect();
x2.connect();
y1.connect();
//...etc.

Также обратите внимание, что вы можете использовать concat() или mergeSequential(), а не merge(), в зависимости от вашего варианта использования (merge() будет охотно подписываться на издателей, concat() — нет, а mergeSequential() будет объединяться в порядке получения, потенциально чередуя значения).

Спасибо большое Михаил! На самом деле, это должен быть неопределенный процесс, поэтому я думаю, что нужно merge или mergeSequential, верно?

yaseco 24.07.2019 15:37

@yaseco Верно. concat() будет ждать завершения первого потока, прежде чем перейти к следующему, поэтому в случае бесконечных потоков он никогда не завершится.

Michael Berry 24.07.2019 16:06

эта часть Flux.from(...::subscribe) кажется подозрительной (обычно это запах кода, чтобы подписаться в середине такой реактивной цепочки). Почему вы включили его вместо простого Flux.merge(x1, x2).publish())?

Simon Baslé 24.07.2019 18:32

Другие вопросы по теме