У меня есть два потока интеграции для импорта данных о головке и положении. Для импорта я использую csv-файлы, организованные в виде папок. Head-CSV-файлы находятся в Head-Folder, а позиционные файлы — в Position-Folder. Структура CSV-файлов, конечно, отличается.
Теперь необходимо, чтобы данные о головке импортировались до данных о положении. Но когда нет доступных данных головы, папка позиции должна быть обработана в любом случае. (Например, для обновления положения существующей головки)
В настоящее время у меня есть два IntegrationFlow. По одному на каждый случай.
Есть ли возможность вызывать эти потоки интеграции один за другим? Если нет, есть ли возможность (возможно, по методу агрегации) использовать два разных источника сообщений одним потоком интеграции, с возможностью определить, является ли это головой или позицией, и безопасностью, что источник заголовка сообщения обрабатывается перед позицией?
@Configuration
public class MyHeadFlowConfig
@Bean
public IntegrationFlow myHeadFlow() {
return IntegrationFlows.from(myHeadMessageSource, myPollerConsumer))
.filter(new SimplePatternFileListFilter("*.csv"))
.handle(myHeadJobConfig)
.get();
}
}
@Configuration
public class MyPositionFlowConfig
@Bean
public IntegrationFlow myPositionFlow() {
return IntegrationFlows.from(myPositionMessageSource, myPollerConsumer))
.filter(new SimplePatternFileListFilter("*.csv"))
.handle(myPositionJobConfig)
.get();
}
}
Есть много способов сделать это.
Если два потока интеграции выполняются в одной и той же JVM, самым простым и естественным подходом будет совместное использование канал. В основном создайте экземпляр MessageChannel и подключите его в конце одного поля и в начале другого.
floaA -> sharedChannel -> flowB
Поддержка ApplicationEvent была бы другим подходом. И, конечно же, набор входящих/исходящих адаптеров.
Интеграционный поток может иметь множество источников. Это одно из основных преимуществ абстракции канал. Для примера предположим, что мы называем этот канал потокМостКанал. Теперь FlowA после завершения отправляет сообщение на этот канал, по существу инициируя FlowB. Помимо того, что FlowA инициируется FlowB, FlowA может иметь дополнительные источники сообщений, которые будут отправлять сообщения на тот же потокМостКанал. Другими словами, потокМостКанал может подаваться из многих источников, но все, что ниже по течению, остается неизменным.
Теоретически я думаю, что понимаю, как это сделать... но я не могу заставить его работать... вы говорите, что FlowB может иметь дополнительные источники сообщений, которые отправляют сообщения в канал. Но как я могу добавить эти источники сообщений в FlowB. И как я могу быть уверен, что только сообщение от floatA к sharedChannel активирует поток B. Извините, если мои вопросы немного базовые... но, похоже, я не полностью понимаю функциональность потока и канала.
Спасибо. Итак, если я правильно понимаю, я должен после .handle(job) выполнить .channel("myMessageChannel"), а второй IntegrationFlow должен быть инициализирован этим каналом, верно? Но как я могу передать второму потоку интеграции новый источник сообщений с данными о позиции? Или я неправильно понял ваш ответ? Не могли бы вы добавить небольшой пример кода?