У меня есть этот поток, который я пытаюсь проверить, но ничего не работает, как ожидалось. Сам поток работает хорошо, но тестирование кажется немного сложным. Это мой поток:
@Configuration
@RequiredArgsConstructor
public class FileInboundFlow {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private String filePath;
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(this.filePath))
.filterFunction(...)
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(
Pollers.fixedDelay(500)
.taskExecutor(this.threadPoolTaskExecutor)
.maxMessagesPerPoll(15)))
.transform(new UnZipTransformer())
.enrichHeaders(this::headersEnricher)
.transform(Message.class, this::modifyMessagePayload)
.route(Map.class, this::channelsRouter)
.get();
}
private String channelsRouter(Map<String, File> payload) {
boolean isZip = payload.values()
.stream()
.anyMatch(file -> isZipFile(file));
return isZip ? ZIP_CHANNEL : XML_CHANNEL; // ZIP_CHANNEL and XML_CHANNEL are PublishSubscribeChannel
}
@Bean
public SubscribableChannel xmlChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(XML_CHANNEL);
return channel;
}
@Bean
public SubscribableChannel zipChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(ZIP_CHANNEL);
return channel;
}
//There is a @ServiceActivator on each channel
@ServiceActivator(inputChannel = XML_CHANNEL)
public void handleXml(Message<Map<String, File>> message) {
...
}
@ServiceActivator(inputChannel = ZIP_CHANNEL)
public void handleZip(Message<Map<String, File>> message) {
...
}
//Plus an @Transformer on the XML_CHANNEL
@Transformer(inputChannel = XML_CHANNEL, outputChannel = BUS_CHANNEL)
private List<BusData> xmlFileToIngestionMessagePayload(Map<String, File> xmlFilesByName) {
return xmlFilesByName.values()
.stream()
.map(...)
.collect(Collectors.toList());
}
}
Я хотел бы протестировать несколько случаев, первый — проверка полезной нагрузки сообщения, опубликованного на каждом канале после окончания fileReaderFlow
.
Итак, я определил этот тестовый класс:
@SpringBootTest
@SpringIntegrationTest
@ExtendWith(SpringExtension.class)
class FileInboundFlowTest {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@TempDir
static Path localWorkDir;
@BeforeEach
void setUp() {
copyFileToTheFlowDir(); // here I copy a file to trigger the flow
}
@Test
void checkXmlChannelPayloadTest() throws InterruptedException {
Thread.sleep(1000); //waiting for the flow execution
PublishSubscribeChannel xmlChannel = this.getBean(XML_CHANNEL, PublishSubscribeChannel.class); // I extract the channel to listen to the message sent to it.
xmlChannel.subscribe(message -> {
assertThat(message.getPayload()).isInstanceOf(Map.class); // This is never executed
});
}
}
Как и ожидалось, этот тест не работает, потому что assertThat(message.getPayload()).isInstanceOf(Map.class);
никогда не выполняется.
Прочитав документация, я не нашел подсказки, которая помогла бы мне решить эту проблему. Любая помощь будет оценена по достоинству! Большое спасибо
Во-первых, channel.setBeanName(XML_CHANNEL);
не влияет на целевой компонент. Вы делаете это на этапе создания компонента, и контейнер внедрения зависимостей ничего не знает об этом параметре: он просто не консультируется с ним. Если вы действительно хотите продиктовать XML_CHANNEL
имя компонента, вам лучше изучить атрибут @Bean(name)
.
Проблема в тесте в том, что вы упускаете факт асинхронной логики потока. Это Files.inboundAdapter()
работает, если полностью другой поток и выдает сообщения за пределами вашего тестового метода. Таким образом, даже если вы смогли подписаться на канал вовремя, до того, как на него будет отправлено какое-либо сообщение, это не означает, что ваш тест будет работать правильно: assertThat()
будет выполняться в другом потоке. Поэтому нет реального отчета JUnit для вашего контекста метода тестирования.
Итак, что я предлагаю сделать, это:
Files.inboundAdapter()
в начале теста перед любой настройкой, которую вы хотели бы выполнить в тесте. Или, по крайней мере, не помещайте файлы в этот filePath
, чтобы адаптер канала не выдавал сообщения.ChannelInterceptor
.CountDownLatch
передать этому подписчику.endpointConfigurer
имеет атрибут id()
. Таким образом, вы можете получить Lifecycle
bean-компонент и вызвать его stop()
, а затем start()
соответственно. В противном случае вы можете использовать @SpringIntegrationTest(noAutoStartup)
с этим id
, чтобы остановить его перед набором тестов. Тогда вам все равно нужно запустить его вручную, когда все настройки будут выполнены.
Большое спасибо, я только что попробовал ваши предложения, и все работает как шарм. Вопрос о вашем первом предложении, как я могу остановить/запустить
Files.inboundAdapter()
?