Я не могу понять решение для, как я считаю, довольно распространенного потока интеграции:
В настоящее время у меня есть IntegrationFlow, использующий PublishSubscribeChannel с двумя потоками IntegrationFlow в качестве подписчиков: один для обработки файла, другой для удаления файла. К сожалению, более поздний (удаление) выполняется независимо от результата первого (процесса), даже если поток «процесса» выдает исключение.
Мне нужен последовательный поток обработки, но я не могу понять, как это реализовать. Создал тестовый код, но он не работает, сообщает
2022-11-22 09:55:54.256 ERROR 14648 --- [ scheduling-1]
o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessagingException: Failed to
invoke method; nested exception is
java.lang.IllegalArgumentException: wrong number of arguments
Код лаборатории:
@Configuration
@EnableIntegration
public class SeqChannels {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
public Message<Integer> source(final AtomicInteger integerSource) {
return MessageBuilder.withPayload(integerSource.incrementAndGet()).build();
}
@ServiceActivator(inputChannel = "process", outputChannel = "delete")
public Integer process(@Payload Integer message) {
return message;
}
@ServiceActivator(inputChannel = "delete")
public void delete(@Payload Integer message) {
}
}
Вам нужно показать конфигурацию потока; по умолчанию второй подписчик не будет вызываться в случае сбоя первого, но это поведение можно изменить; поэтому нам нужно увидеть ваш поток. Кроме того, подобное сообщение об ошибке бесполезно без полной трассировки стека.
@GaryRussell Вышеприведенное исключение связано с показанным кодом. Это полный лабораторный пример. На самом деле, это просто еще одна отчаянная попытка реализовать мой "последовательный" поток. Также пробовал использовать PubSubChannel с двумя подписчиками, но они не зависят друг от друга и вызываются независимо от любого «предыдущего» вывода.
Еще раз - пожалуйста, покажите конфиги потока и полную трассировку стека; никто не может помочь вам без этого.
@InboundChannelAdapter
метод не должен иметь никаких параметров.
Вы не можете иметь параметры в методе @InboundChannelAdapter
. Это работает...
@Configuration
@EnableIntegration
class SeqChannels {
AtomicInteger integerSource = new AtomicInteger();
@InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
public Message<Integer> source() {
return MessageBuilder.withPayload(this.integerSource.incrementAndGet()).build();
}
@ServiceActivator(inputChannel = "process", outputChannel = "delete")
public Integer process(@Payload Integer message) {
System.out.println("Process: " + message);
return message;
}
@ServiceActivator(inputChannel = "delete")
public void delete(@Payload Integer message) {
System.out.println("delete: " + message);
}
}
Process: 1
delete: 1
Process: 2
delete: 2
Process: 3
...
Спасибо за ответ. Однако при реализации я не могу понять, как использовать исходящие адаптеры Spring для Sftp, Jdbc и AMQP. Похоже, это либо ServiceActivator, либо конечная точка сообщения, а не то и другое. При использовании ServiceActivator я должен пройти весь путь, используя, например, JdbcTemplate и прочее. Как правильно разрешить обработку сообщения несколькими обработчиками, последовательно и прервать дальнейшую обработку в случае сбоя одного обработчика?
Исходящие адаптеры работают в одну сторону, без ответа. Используйте канал pubsub или маршрутизатор списка получателей.
Еще раз спасибо). У меня сложилось впечатление, что pub/sub-каналы больше похожи на fan-out-очереди, вызывающие своих подписчиков параллельно, но теперь я вижу, что они последовательные.
Они являются последовательными, если вы не предоставляете исполнителя.
Кстати: это Spring Integration 5.5