У нас есть следующий простой рабочий процесс на основе int-jpa:
[адаптер входящего канала] -> [активатор службы]
Конфигурация такая:
<int:channel id = "inChannel"> <int:queue/> </int:channel>
<int:channel id = "outChannel"> <int:queue/> </int:channel>
<int-jpa:inbound-channel-adapter id = "inChannelAdapter" channel = "inChannel"
jpa-query = "SOME_COMPLEX_POLLING_QUERY"
max-results = "2">
<int:poller max-messages-per-poll = "2" fixed-rate = "20" >
<int:advice-chain synchronization-factory = "txSyncFactory" >
<tx:advice transaction-manager = "transactionManager" >
<tx:attributes>
<tx:method name = "*" timeout = "30000" />
</tx:attributes>
</tx:advice>
<int:ref bean = "pollerAdvice"/>
</int:advice-chain>
</int-jpa:inbound-channel-adapter>
<int:service-activator input-channel = "inChannel" ref = "myActivator"
method = "pollEntry" output-channel = "outChannel" />
<bean id = "myActivator" class = "com.company.myActivator" />
<bean id = "pollerAdvice" class = "com.company.myPollerAdvice" />
Точкой входа для обработки является постоянно растущая таблица, по которой запускается SOME_COMPLEX_POLLING_QUERY. Текущий поток:
SOME_COMPLEX_POLLING_QUERY будет возвращать только записи, для которых busy установлен на false (мы устанавливаем busy на true, как только опрос будет выполнен с использованием txSyncFactory)myActivator, где это может занять от 1 до 30 минут.busy с true на false.Проблема: нам нужно инициировать уведомление, даже когда обработка всех записей, которые присутствовали в таблице, завершена.
Подход попробовал: Мы использовали afterReturning из pollerAdvice, чтобы выяснить, вернул ли SOME_COMPLEX_POLLING_QUERY какие-либо результаты или нет. Однако этот метод начнет возвращать «Нет записей» до того, как Thread-2 завершит обработку всех записей.
Примечание:
outbound-channel-adapter, так как он нам не нужен. Однако мы открыты для использования, если это часть предлагаемого решения.Не уверен, что это сработает для вас, но, поскольку вам все равно нужно подождать с уведомлением до Thread-2, я бы посоветовал иметь некоторый компонент AtomicBoolean. В упомянутом afterReturning(), когда нет данных, запрашиваемых из БД, вы просто меняете состояние AtomicBoolean на true. Когда Thread-2 завершает свою работу, он может вызвать <filter>, чтобы проверить состояние AtomicBoolean, а затем действительно выполнить <int-event:outbound-channel-adapter>, чтобы отправить событие уведомления.
Итак, окончательное решение об отправке события или нет определенно принимается Thread-2, а не адаптером канала опроса.
Ну, разве это не то, что вы получаете в outChannel? А вы сами говорите: Once the processing is done. Итак, похоже, вы знаете, что происходит
И еще одно замечание: если есть опрошенные сообщения из БД, вам необходимо сбросить этот флаг AtomicBoolean на false со значением «не завершено» для Thread-2.
Что ж, прямо сейчас я не использую outChannel, как упоминалось в примечании. С Once the processing is done я имел в виду в самом myActivator, мы вызываем DB и устанавливаем флаг на false.
Спасибо, Артем. Но я не уверен, как мы узнаем, что
Thread-2завершает свою работу?