У меня есть сценарий, в котором мне нужно вызвать вызовы REST системы A и B параллельно, объединить ответы и преобразовать их в один FinalResponse.
Для этого я использую сплиттер и согласователь Spring Integration, и конфигурация приведена ниже.
Я показал конечную точку REST, когда запрос (запрос имеет co-Relationship ID в заголовке) приходит к контроллеру, мы вызываем шлюз, и разделитель отправляет запросы в каналы A и B.
Активатор службы A прослушивает канал A и вызывает системный вызов REST, а активатор службы B прослушивает канал B и вызывает вызов REST системы B.
Затем мне нужно собрать ответы из систем A и B, а затем преобразовать их в FinalResponse. В настоящее время агрегирование и преобразование работают нормально.
Иногда, когда к контроллеру поступает несколько запросов, FinalResponse занимает больше времени по сравнению с одним запросом к контроллеру. Все ответы на запросы приходят почти одновременно, не знаю почему (хотя последний запрос к контроллеру был отправлен через 6-7 секунд после 1-го запроса). Что-то не так в моей конфигурации, связанной с потоками? Не уверен, почему на ответ требуется больше времени, когда к контроллеру приходит несколько запросов.
Кроме того, я не использую CorrelationStrategy, нужно ли нам его использовать? Смогу ли я столкнуться с какими-либо проблемами в многопоточной среде с приведенной ниже конфигурацией? Любые отзывы о конфигурации были бы полезны
// Controller
{
FinalResponse aggregatedResponse = gateway.collateServiceInformation(inputData);
}
//Configuration
@Autowired
Transformer transformer;
//Gateway
@Bean
public IntegrationFlow gatewayServiceFlow() {
return IntegrationFlows.from("input_channel")
.channel("split_channel").get();
}
//splitter
@Bean
public IntegrationFlow splitAggregatorFlow() {
return IntegrationFlows.from("split_channel").
.split(SomeClass.class, SomeClass::getResources)
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.<Resource, String>route(Resource::getName,
mapping -> mapping.channelMapping("A", "A")
.channelMapping("B", "B"))
.get();
}
//aggregator
@Bean
public IntegrationFlow aggregateFlow() {
return IntegrationFlows.from("aggregate_channel").aggregate()
.channel("transform_channel").transform(transformer).get();
}
.
.
.
//Transformer
@Component
@Scope("prototype")
public class Transformer {
@Transformer
public FinalResponse transform(final List<Result> responsesFromAAndB) {
//transformation logic and then return final response
}
}
Разделитель предоставляет стратегию по умолчанию для деталей корреляции в заголовках. В дальнейшем их будет использовать агрегатор. То, о чем вы говорите, называется scatter-gather: https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/messaging-routing-chapter.html#scatter-gather. Существует эквивалент Java DSL.
Я думаю, ваша проблема в том, что какой-то запрос в разделенном наборе не выполняется, поэтому агрегатор не может завершить группу для этого запроса. Пока в вашей конфигурации ничего очевидного ...