Я хочу создать поток IntegrationFlow для следующего потока.
@Bean
public IntegrationFlow buildCart() {
return f -> f.handle(validate, "buildPreCheck")
.handle(preProcessProcessor)
.handle(getOffersProcessor)
.handle(buildItems)
**.wireTap(log())**
.handle(validateItems)
.handle(deliver);
}
Обновлено:
Привет, Артем, я добавил Wire Tap, как в приведенном ниже коде. Тем не менее, он исключает узел WireTap как Sequencal и ждет этого узла.
Пожалуйста, помогите сделать его узлом Aysnc.
@Bean
public IntegrationFlow log() {
return f -> f.handle(auditProcessor).channel("nullChannel");
}
@ServiceActivator
@Description("Call and get the Offers Request")
public void getDetails(Message<Context> message) throws InterruptedException {
log.info("getDetails-Sleep-Start");
Thread.sleep(3000);
log.info("getDetails-Sleep-End");
}
В Spring Integration Java DSL мы все упускаем из виду, что одним из очень важных компонентов Spring Integration является MessageChannel
. И дело в том, что каналы могут быть добавлены в поток всякий раз, когда нам нужно больше, чем DirectChannel
по умолчанию. Для асинхронного выполнения у нас есть ExecutorChannel
. Но прежде чем мы перейдем к разветвленному потоку, нам нужно как-то туда прыгнуть, не нарушая основной. В терминах EIP это называется Wire-Tap: https://www.enterpriseintegrationpatterns.com/patterns/messaging/WireTap.html.
Spring Integration Java DSL предлагает такую реализацию, как оператор .wireTap()
в потоке. Логика аудита может быть реализована в выделенном подпотоке или через канал, но не забывайте о ExecutorChannel
!
Вы можете увидеть больше информации в Справочном руководстве: https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-wiretap
ОБНОВИТЬ
В
.handle(buildItems)
.wireTap(log())
правильный путь: вы собираетесь проверить результат buildItems
и перейти к следующему шагу.
log()
необходимо модифицировать следующим образом:
@Bean
public IntegrationFlow log() {
return f -> f.channel(c -> c.executor(taskExecutorBean())).handle(auditProcessor).channel("nullChannel");
}
Обратите внимание на c.executor()
. Таким образом, мы добавляем асинхронную передачу для нашего подпотока log()
.
Пожалуйста, найдите ОБНОВЛЕНИЕ в моем ответе.
Спасибо, Артем.
Вам обязательно нужно добавить комментарий к моему ответу, я буду уведомлен. В противном случае ваша правка останется без внимания.