Использование потокового моста для отправки сообщения от одного потребителя другому потребителю

Я видел проблему: StreamBridge не отправляет заголовок подтверждения и соответствующий код, как показано ниже для GH--2563.

if (ObjectUtils.containsElement(consumerBindingNames, destinationName)) { //GH-2563
                    logger.warn("You seem to be sending data to the input binding.  It is not "
                            + "recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
                }

Мой вариант использования относится к той же категории, но я не уверен, как это сделать правильно. У меня есть свойства ниже в applicationiton.properties

spring.cloud.function.definition=startTransformation;auditLog
spring.cloud.stream.function.bindings.startTransformation-in-0=transformer
spring.cloud.stream.bindings.transformer.group=transformer-group
spring.cloud.stream.bindings.transformer.consumer.concurrency=3
spring.cloud.stream.bindings.transformer.consumer.maxAttempts = 1

spring.cloud.stream.function.bindings.auditLog-in-0=audit
spring.cloud.stream.bindings.audit.group=audit-group
spring.cloud.stream.bindings.audit.consumer.concurrency=1
spring.cloud.stream.bindings.audit.consumer.maxAttempts=1

Объясняю с помощью диаграммы

псевдокод

public void startTransformation(Message etlMessage) {
        String message = "Request received for transformation " + etlMessage;
        streamBridge.send("audit",messageReceivedEvent);
        //complex logic.
        streamBridge.send("audit",messageCompletedEvent);
    }

public void rss(Message ingestion) {
            String message = "ingesting message";
            streamBridge.send("audit",messageReceivedEvent);
            //complex logic.
            streamBridge.send("audit",messageCompletedEvent);
           streamBridge.send("transformer",messageCompletedEvent);

        }

При отправке сообщения я получаю предупреждение и понимаю, почему, потому что имя назначения и имя привязки потребителя совпадают.

Для решения я увидел комментарий «Вам нужна выходная привязка с входящим потоком в качестве пункта назначения». Я не могу этого понять.

Приведенный выше сценарий работает, если потребители находятся в разных JVM. Я могу отправить сообщение без предупреждений для ограничения ввода потребителя, работающего как отдельное приложение JVM (я думаю, поскольку оно не имеет никакой информации об имени привязки другого JVM).

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
71
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Основываясь на обновлениях и разъяснениях, которые вы предоставили в комментариях, я предполагаю, что вы не предоставляете эту audit конфигурацию в приложении RSS. Возможно, именно поэтому вы не видите предупреждение в приложении RSS, поскольку это приложение ничего не знает о своем статусе привязки (аудита) и только во втором загрузочном приложении.

Основываясь на приведенной выше диаграмме, вы можете попробовать вот что:

Вместо отправки в пункт назначения (audit), который совпадает с именем привязки, можете ли вы использовать для этой привязки другой пункт назначения? Например:


public void startTransformation(Message etlMessage) {
        String message = "Request received for transformation " + etlMessage;
        streamBridge.send("audit-destination",messageReceivedEvent);
        //complex logic.
        streamBridge.send("audit-destination",messageCompletedEvent);
    }

public void rss(Message ingestion) {
            String message = "ingesting message";
            streamBridge.send("audit-destination",messageReceivedEvent);
            //complex logic.
            streamBridge.send("audit-destination",messageCompletedEvent);
           streamBridge.send("transformer",messageCompletedEvent);

        }

Затем добавьте это в конфигурацию второго приложения:

spring.cloud.stream.function.bindings.auditLog-in-0=audit
spring.cloud.stream.bindings.audit.destination=audit-destination
spring.cloud.stream.bindings.audit.group=audit-group
spring.cloud.stream.bindings.audit.consumer.concurrency=1
spring.cloud.stream.bindings.audit.consumer.maxAttempts=1

audit-destination — произвольное имя и может быть чем угодно.

Я обновил вопрос, чтобы дать больше ясности с диаграммой.

MAY 21.05.2024 19:54

Вопрос: есть ли какая-то конкретная причина, по которой вы явно предоставляете эту потребительскую конфигурацию? ---> Я хочу, чтобы привязка обмена и очереди сохранялась для потребителя аудита, и если я удалю предложенные вами свойства, то будут созданы анонимные привязки для аудита, которые исчезнут при остановке процесса Spring-Boot-2. Это приведет к потере сообщений от процесса Spring-Boot-1 для аудита. Именно по этой причине я предоставляю привязку аудита.

MAY 21.05.2024 19:55

На вопрос: «потребители находятся в разных JVM» -> Как показано на диаграмме, я не получаю никаких предупреждений, даже если я использую потоковый мост для отправки сообщений потребителям аудита и startTransfromation непосредственно от потребителя rss. Я получаю предупреждение только тогда, когда отправляю сообщение из startTransfromation в аудит.

MAY 21.05.2024 19:56

Спасибо за обновление ответа. «Возможно, поэтому вы не видите предупреждение в приложении RSS, поскольку это приложение ничего не знает о своем статусе привязки (аудита) и только во втором загрузочном приложении». - Да, это то, что я имел в виду, поскольку они находятся в отдельной JVM. Я применил ваши изменения, все работает нормально, предупреждение пропало. Спасибо за ответ.

MAY 22.05.2024 12:26

Другие вопросы по теме