Повторная доставка сообщений JMS не работает для темы в Spring + ActiveMQ + Atomikos + JTA + Tomcat

У меня есть следующая конфигурация в приложении, которое отлично работает с очередями и повторно доставляет сообщения, когда происходит RuntimeException с откатом транзакции, как и ожидалось.

Но та же конфигурация не может повторно доставить сообщение с темами и выдает следующее предупреждающее сообщение:

WARN  DefaultMessageListenerContainer - Setup of JMS message listener invoker failed for destination 'XXX_TOPIC' - trying to recover. Cause: JTA transaction unexpectedly rolled back (maybe due to a timeout); nested exception is javax.transaction.RollbackException: One or more resources refused to commit (possibly because of a timeout in the resource - see the log for details). This transaction has been rolled back instead.

Конфигурация:

<bean id = "amqConnectionFactory" class = "org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name = "brokerURL" value = "tcp://localhost:61616"/>
    <!-- <property name = "sendTimeout" value = "1000"/> -->
    <property name = "redeliveryPolicy" ref = "redeliveryPolicy"/>
</bean>

<!-- Atomikos Connection Factory Wrapper For Gobal Tx -->
<bean id = "queueConnectionFactoryBean" class = "com.atomikos.jms.AtomikosConnectionFactoryBean">
    <property name = "uniqueResourceName" value = "amq1" />
    <property name = "xaConnectionFactory" ref = "amqConnectionFactory" />
</bean>

<bean id = "jmsConnectionFactory" class = "org.springframework.jms.connection.CachingConnectionFactory">
    <property name = "targetConnectionFactory">
        <ref bean = "queueConnectionFactoryBean"/>
    </property>
    <property name = "sessionCacheSize" value = "20"/>
    <property name = "cacheConsumers" value = "false"/>
</bean>

<bean id = "jtaTransactionManager" class = "org.springframework.transaction.jta.JtaTransactionManager">
    <property name = "transactionManager">
        <bean class = "com.atomikos.icatch.jta.UserTransactionManager"
                init-method = "init"
                destroy-method = "close">
            <property name = "forceShutdown" value = "false" />
        </bean>
    </property>
    <property name = "userTransaction">
        <bean class = "com.atomikos.icatch.jta.UserTransactionImp">
            <property name = "transactionTimeout" value = "300" />
        </bean>
    </property>
</bean> 

<!-- Topic configuration -->
<bean id = "messageListener1" class = "com.x.y.impl.JmsMessageListener"></bean>

<bean id = "container1" class = "org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name = "connectionFactory" ref = "jmsConnectionFactory"/>
    <property name = "destinationName" value = "XXX_TOPIC"/>
    <property name = "messageListener" ref = "messageListener1" />
    <property name = "pubSubDomain" value = "true" />
    <property name = "transactionManager" ref = "jtaTransactionManager" />
    <property name = "concurrency" value = "1" />
    <property name = "receiveTimeout" value = "3000" />

    <!-- For local session and Atomikos-->
    <property name = "sessionTransacted" value = "true"/>
</bean>

<!-- Bean configuration -->
<bean id = "messageListener2" class = "com.x.y.impl.JmsMessageListener"></bean>

<bean id = "container2" class = "org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name = "connectionFactory" ref = "jmsConnectionFactory"/>
    <property name = "destinationName" value = "XXX_QUEUE"/>
    <property name = "messageListener" ref = "messageListener2" />
    <property name = "pubSubDomain" value = "false" />
    <property name = "transactionManager" ref = "jtaTransactionManager" />
    <property name = "concurrency" value = "1" />

    <!-- For local session and Atomikos-->
    <property name = "sessionTransacted" value = "true"/>
</bean>

Используемая версия Spring: 3.1, ActiveMQ 5.14, Atomikos 4.0.6. Пожалуйста, дайте мне знать, если я пропустил какие-либо настройки для темы DMLC.

Какую роль здесь играет RuntimeException? Вы бросаете его из onMessage, чтобы вызвать повторную доставку?

Justin Bertram 11.03.2019 17:53

Да, в конкретном исключении приложения выдается RuntimeException, это кодируется как для Queue, так и для Topic.

Sonu 11.03.2019 18:01

Что бы это ни стоило, в спецификации JMS 1.1 говорится: «Прослушиватель может выдать исключение RuntimeException, однако это считается ошибкой программирования клиента» (выделено мной). Я мог бы порекомендовать вам обрабатывать ваши исключения с помощью try/catch, регистрировать необходимые данные и откатывать транзакцию через соответствующий API.

Justin Bertram 11.03.2019 18:13

Хорошо, попробую, но я не понимаю, почему вышеприведенное работает нормально для очереди, но не для темы. В случае RuntimeException контейнер должен позаботиться об откате и повторной доставке.

Sonu 12.03.2019 06:01

Я не обязательно думаю, что выбрасывание RuntimeException является причиной вашей проблемы, но это все же стоит исправить, тем более что это связано с мог.

Justin Bertram 12.03.2019 14:09
0
5
637
0

Другие вопросы по теме