Spring Integration: определите последовательную обработку сообщений с помощью Java DSL

Я не могу понять решение для, как я считаю, довольно распространенного потока интеграции:

  1. прочитать файл из источника
  2. файл процесса
  3. удалить файл из источника, если обработка прошла успешно.

В настоящее время у меня есть IntegrationFlow, использующий PublishSubscribeChannel с двумя потоками IntegrationFlow в качестве подписчиков: один для обработки файла, другой для удаления файла. К сожалению, более поздний (удаление) выполняется независимо от результата первого (процесса), даже если поток «процесса» выдает исключение.

Мне нужен последовательный поток обработки, но я не могу понять, как это реализовать. Создал тестовый код, но он не работает, сообщает

2022-11-22 09:55:54.256 ERROR 14648 --- [   scheduling-1]
o.s.integration.handler.LoggingHandler   :
org.springframework.messaging.MessagingException: Failed to 
invoke method; nested exception is 
java.lang.IllegalArgumentException: wrong number of arguments

Код лаборатории:

@Configuration
@EnableIntegration
public class SeqChannels {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
    public Message<Integer> source(final AtomicInteger integerSource) {
        return MessageBuilder.withPayload(integerSource.incrementAndGet()).build();
    }

    @ServiceActivator(inputChannel = "process", outputChannel = "delete")
    public Integer process(@Payload Integer message) {
        return message;
    }

    @ServiceActivator(inputChannel = "delete")
    public void delete(@Payload Integer message) {

    }

}

Кстати: это Spring Integration 5.5

Christoph Dahlen 22.11.2022 10:20

Вам нужно показать конфигурацию потока; по умолчанию второй подписчик не будет вызываться в случае сбоя первого, но это поведение можно изменить; поэтому нам нужно увидеть ваш поток. Кроме того, подобное сообщение об ошибке бесполезно без полной трассировки стека.

Gary Russell 22.11.2022 14:49

@GaryRussell Вышеприведенное исключение связано с показанным кодом. Это полный лабораторный пример. На самом деле, это просто еще одна отчаянная попытка реализовать мой "последовательный" поток. Также пробовал использовать PubSubChannel с двумя подписчиками, но они не зависят друг от друга и вызываются независимо от любого «предыдущего» вывода.

Christoph Dahlen 22.11.2022 15:01

Еще раз - пожалуйста, покажите конфиги потока и полную трассировку стека; никто не может помочь вам без этого.

Gary Russell 22.11.2022 15:20

@InboundChannelAdapterметод не должен иметь никаких параметров.

Gary Russell 22.11.2022 15:27
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
5
69
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы не можете иметь параметры в методе @InboundChannelAdapter. Это работает...

@Configuration
@EnableIntegration
class SeqChannels {

    AtomicInteger integerSource = new AtomicInteger();

    @InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
    public Message<Integer> source() {
        return MessageBuilder.withPayload(this.integerSource.incrementAndGet()).build();
    }

    @ServiceActivator(inputChannel = "process", outputChannel = "delete")
    public Integer process(@Payload Integer message) {
        System.out.println("Process: " + message);
        return message;
    }

    @ServiceActivator(inputChannel = "delete")
    public void delete(@Payload Integer message) {
        System.out.println("delete: " + message);
    }

}
Process: 1
delete: 1
Process: 2
delete: 2
Process: 3
...

Спасибо за ответ. Однако при реализации я не могу понять, как использовать исходящие адаптеры Spring для Sftp, Jdbc и AMQP. Похоже, это либо ServiceActivator, либо конечная точка сообщения, а не то и другое. При использовании ServiceActivator я должен пройти весь путь, используя, например, JdbcTemplate и прочее. Как правильно разрешить обработку сообщения несколькими обработчиками, последовательно и прервать дальнейшую обработку в случае сбоя одного обработчика?

Christoph Dahlen 24.11.2022 07:42

Исходящие адаптеры работают в одну сторону, без ответа. Используйте канал pubsub или маршрутизатор списка получателей.

Gary Russell 24.11.2022 14:27

Еще раз спасибо). У меня сложилось впечатление, что pub/sub-каналы больше похожи на fan-out-очереди, вызывающие своих подписчиков параллельно, но теперь я вижу, что они последовательные.

Christoph Dahlen 24.11.2022 14:59

Они являются последовательными, если вы не предоставляете исполнителя.

Gary Russell 24.11.2022 15:24

Другие вопросы по теме