Я новичок в проекте Spring Integration, теперь мне нужно создать поток с Java DSL и протестировать его. Я придумал эти потоки. Первый должен запускаться cron и вызывать второй, который вызывает конечную точку HTTP и переводит XML-ответ в POJO:
@Bean
IntegrationFlow pollerFlow() {
return IntegrationFlows
.from(() -> new GenericMessage<>(""),
e -> e.poller(p -> p.cron(this.cron)))
.channel("pollingChannel")
.get();
}
@Bean
IntegrationFlow flow(HttpMessageHandlerSpec bulkEndpoint) {
return IntegrationFlows
.from("pollingChannel")
.enrichHeaders(authorizationHeaderEnricher(user, password))
.handle(bulkEndpoint)
.transform(xmlTransformer())
.channel("httpResponseChannel")
.get();
}
@Bean
HttpMessageHandlerSpec bulkEndpoint() {
return Http
.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.errorHandler(new DefaultResponseErrorHandler());
}
Теперь я хочу протестировать поток и имитировать вызов HTTP, но, изо всех сил пытаясь имитировать обработчик HTTP, я попытался сделать это следующим образом:
@ExtendWith(SpringExtension.class)
@SpringIntegrationTest(noAutoStartup = {"pollerFlow"})
@ContextConfiguration(classes = FlowConfiguration.class)
public class FlowTests {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@Autowired
public DirectChannel httpResponseChannel;
@Autowired
public DirectChannel pollingChannel;
@Test
void test() {
final MockMessageHandler mockHandler = MockIntegration.mockMessageHandler()
.handleNextAndReply(message -> new GenericMessage<>(xml, message.getHeaders()));
mockIntegrationContext.substituteMessageHandlerFor("bulkEndpoint", mockHandler);
httpResponseChannel.subscribe(message -> {
assertThat(message.getPayload(), is(notNullValue()));
assertThat(message.getPayload(), instanceOf(PartsSalesOpenRootElement.class));
});
pollingChannel.send(new GenericMessage<>(""));
}
}
Но я всегда получаю сообщение об ошибке, что в строке:
mockIntegrationContext.substituteMessageHandlerFor("bulkEndpoint", mockHandler);
org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'bulkEndpoint' is expected to be of type 'org.springframework.integration.endpoint.IntegrationConsumer' but was actually of type 'org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler'
Я делаю что-то не так здесь? Я предполагаю, что у меня проблема с самим IntegrationFlow, или, может быть, проблема связана с моим подходом к тестированию.
Ошибка правильная. bulkEndpoint
сама по себе не является конечной точкой. Это действительно MessageHandler
. Конечная точка создается из .handle(bulkEndpoint)
.
См. документы: https://docs.spring.io/spring-integration/docs/current/reference/html/overview.html#finding-class-names-for-java-and-dsl-configuration и https://docs.spring.io/spring-integration/docs/current/reference/html/testing.html#testing-mocks.
Итак, чтобы заставить его работать, вам нужно сделать что-то вроде этого:
.handle(bulkEndpoint, e -> e.id("actualEndpoint"))
А потом в тесте:
mockIntegrationContext.substituteMessageHandlerFor("actualEndpoint", mockHandler);
Вам также, вероятно, нужно подумать, чтобы этот pollerFlow
не запускался, когда вы тестируете его, потому что вы отправляете сообщение в pollingChannel
вручную. Таким образом, нет никаких конфликтов с тем, что вы хотели бы протестировать. По этой причине вы также добавляете id()
в свой e.poller(p -> p.cron(this.cron))
и используете @SpringIntegrationTest(noAutoStartup)
, чтобы остановить его перед тестом. Я вижу, вы пытаетесь noAutoStartup = {"pollerFlow"}
, но это не поможет для статических потоков. В этом случае вам действительно нужно остановить реальную конечную точку.
Большое спасибо за Вашу помощь! Я также смог добавить каналы между конечными точками и тестовый код между двумя конечными точками, но такой подход мне нравится больше.