Определите конец циклического рабочего процесса в весенней интеграции (inbound-channel => service-activator)

У нас есть следующий простой рабочий процесс на основе int-jpa:

[адаптер входящего канала] -> [активатор службы]

Конфигурация такая:

<int:channel id = "inChannel">  <int:queue/> </int:channel>
<int:channel id = "outChannel"> <int:queue/> </int:channel>
<int-jpa:inbound-channel-adapter id = "inChannelAdapter"  channel = "inChannel"
     jpa-query = "SOME_COMPLEX_POLLING_QUERY"
     max-results = "2">
        <int:poller max-messages-per-poll = "2" fixed-rate = "20" >
        <int:advice-chain synchronization-factory = "txSyncFactory"   >
            <tx:advice transaction-manager = "transactionManager" >
                <tx:attributes>
                    <tx:method name = "*" timeout = "30000" />
                </tx:attributes>
            </tx:advice>
            <int:ref bean = "pollerAdvice"/>
        </int:advice-chain>
    </int-jpa:inbound-channel-adapter>

<int:service-activator input-channel = "inChannel" ref = "myActivator"
 method = "pollEntry" output-channel = "outChannel" />

<bean id = "myActivator" class = "com.company.myActivator" />    
<bean id = "pollerAdvice" class = "com.company.myPollerAdvice" />

Точкой входа для обработки является постоянно растущая таблица, по которой запускается SOME_COMPLEX_POLLING_QUERY. Текущий поток:

  1. [Тема-1]SOME_COMPLEX_POLLING_QUERY будет возвращать только записи, для которых busy установлен на false (мы устанавливаем busy на true, как только опрос будет выполнен с использованием txSyncFactory)
  2. [Тема-2] Эти записи будут проходить через myActivator, где это может занять от 1 до 30 минут.
  3. [Тема-2] После завершения обработки мы возвращаем busy с true на false.

Проблема: нам нужно инициировать уведомление, даже когда обработка всех записей, которые присутствовали в таблице, завершена.

Подход попробовал: Мы использовали afterReturning из pollerAdvice, чтобы выяснить, вернул ли SOME_COMPLEX_POLLING_QUERY какие-либо результаты или нет. Однако этот метод начнет возвращать «Нет записей» до того, как Thread-2 завершит обработку всех записей.

Примечание:

  • Те же записи будут обработаны снова через 24 часа. Но на этот раз записей будет больше.
  • Мы не используем outbound-channel-adapter, так как он нам не нужен. Однако мы открыты для использования, если это часть предлагаемого решения.
3
0
64
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Не уверен, что это сработает для вас, но, поскольку вам все равно нужно подождать с уведомлением до Thread-2, я бы посоветовал иметь некоторый компонент AtomicBoolean. В упомянутом afterReturning(), когда нет данных, запрашиваемых из БД, вы просто меняете состояние AtomicBoolean на true. Когда Thread-2 завершает свою работу, он может вызвать <filter>, чтобы проверить состояние AtomicBoolean, а затем действительно выполнить <int-event:outbound-channel-adapter>, чтобы отправить событие уведомления.

Итак, окончательное решение об отправке события или нет определенно принимается Thread-2, а не адаптером канала опроса.

Спасибо, Артем. Но я не уверен, как мы узнаем, что Thread-2 завершает свою работу?

Khushbu 04.04.2018 16:59

Ну, разве это не то, что вы получаете в outChannel? А вы сами говорите: Once the processing is done. Итак, похоже, вы знаете, что происходит

Artem Bilan 04.04.2018 18:05

И еще одно замечание: если есть опрошенные сообщения из БД, вам необходимо сбросить этот флаг AtomicBoolean на false со значением «не завершено» для Thread-2.

Artem Bilan 04.04.2018 18:06

Что ж, прямо сейчас я не использую outChannel, как упоминалось в примечании. С Once the processing is done я имел в виду в самом myActivator, мы вызываем DB и устанавливаем флаг на false.

Khushbu 05.04.2018 03:51

Другие вопросы по теме