Я видел проблему: StreamBridge не отправляет заголовок подтверждения и соответствующий код, как показано ниже для GH--2563.
if (ObjectUtils.containsElement(consumerBindingNames, destinationName)) { //GH-2563
logger.warn("You seem to be sending data to the input binding. It is not "
+ "recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
}
Мой вариант использования относится к той же категории, но я не уверен, как это сделать правильно. У меня есть свойства ниже в applicationiton.properties
spring.cloud.function.definition=startTransformation;auditLog
spring.cloud.stream.function.bindings.startTransformation-in-0=transformer
spring.cloud.stream.bindings.transformer.group=transformer-group
spring.cloud.stream.bindings.transformer.consumer.concurrency=3
spring.cloud.stream.bindings.transformer.consumer.maxAttempts = 1
spring.cloud.stream.function.bindings.auditLog-in-0=audit
spring.cloud.stream.bindings.audit.group=audit-group
spring.cloud.stream.bindings.audit.consumer.concurrency=1
spring.cloud.stream.bindings.audit.consumer.maxAttempts=1
Объясняю с помощью диаграммы
псевдокод
public void startTransformation(Message etlMessage) {
String message = "Request received for transformation " + etlMessage;
streamBridge.send("audit",messageReceivedEvent);
//complex logic.
streamBridge.send("audit",messageCompletedEvent);
}
public void rss(Message ingestion) {
String message = "ingesting message";
streamBridge.send("audit",messageReceivedEvent);
//complex logic.
streamBridge.send("audit",messageCompletedEvent);
streamBridge.send("transformer",messageCompletedEvent);
}
При отправке сообщения я получаю предупреждение и понимаю, почему, потому что имя назначения и имя привязки потребителя совпадают.
Для решения я увидел комментарий «Вам нужна выходная привязка с входящим потоком в качестве пункта назначения». Я не могу этого понять.
Приведенный выше сценарий работает, если потребители находятся в разных JVM. Я могу отправить сообщение без предупреждений для ограничения ввода потребителя, работающего как отдельное приложение JVM (я думаю, поскольку оно не имеет никакой информации об имени привязки другого JVM).





Основываясь на обновлениях и разъяснениях, которые вы предоставили в комментариях, я предполагаю, что вы не предоставляете эту audit конфигурацию в приложении RSS. Возможно, именно поэтому вы не видите предупреждение в приложении RSS, поскольку это приложение ничего не знает о своем статусе привязки (аудита) и только во втором загрузочном приложении.
Основываясь на приведенной выше диаграмме, вы можете попробовать вот что:
Вместо отправки в пункт назначения (audit), который совпадает с именем привязки, можете ли вы использовать для этой привязки другой пункт назначения? Например:
public void startTransformation(Message etlMessage) {
String message = "Request received for transformation " + etlMessage;
streamBridge.send("audit-destination",messageReceivedEvent);
//complex logic.
streamBridge.send("audit-destination",messageCompletedEvent);
}
public void rss(Message ingestion) {
String message = "ingesting message";
streamBridge.send("audit-destination",messageReceivedEvent);
//complex logic.
streamBridge.send("audit-destination",messageCompletedEvent);
streamBridge.send("transformer",messageCompletedEvent);
}
Затем добавьте это в конфигурацию второго приложения:
spring.cloud.stream.function.bindings.auditLog-in-0=audit
spring.cloud.stream.bindings.audit.destination=audit-destination
spring.cloud.stream.bindings.audit.group=audit-group
spring.cloud.stream.bindings.audit.consumer.concurrency=1
spring.cloud.stream.bindings.audit.consumer.maxAttempts=1
audit-destination — произвольное имя и может быть чем угодно.
Вопрос: есть ли какая-то конкретная причина, по которой вы явно предоставляете эту потребительскую конфигурацию? ---> Я хочу, чтобы привязка обмена и очереди сохранялась для потребителя аудита, и если я удалю предложенные вами свойства, то будут созданы анонимные привязки для аудита, которые исчезнут при остановке процесса Spring-Boot-2. Это приведет к потере сообщений от процесса Spring-Boot-1 для аудита. Именно по этой причине я предоставляю привязку аудита.
На вопрос: «потребители находятся в разных JVM» -> Как показано на диаграмме, я не получаю никаких предупреждений, даже если я использую потоковый мост для отправки сообщений потребителям аудита и startTransfromation непосредственно от потребителя rss. Я получаю предупреждение только тогда, когда отправляю сообщение из startTransfromation в аудит.
Спасибо за обновление ответа. «Возможно, поэтому вы не видите предупреждение в приложении RSS, поскольку это приложение ничего не знает о своем статусе привязки (аудита) и только во втором загрузочном приложении». - Да, это то, что я имел в виду, поскольку они находятся в отдельной JVM. Я применил ваши изменения, все работает нормально, предупреждение пропало. Спасибо за ответ.
Я обновил вопрос, чтобы дать больше ясности с диаграммой.