У меня есть следующая конфигурация в приложении, которое отлично работает с очередями и повторно доставляет сообщения, когда происходит RuntimeException с откатом транзакции, как и ожидалось.
Но та же конфигурация не может повторно доставить сообщение с темами и выдает следующее предупреждающее сообщение:
WARN DefaultMessageListenerContainer - Setup of JMS message listener invoker failed for destination 'XXX_TOPIC' - trying to recover. Cause: JTA transaction unexpectedly rolled back (maybe due to a timeout); nested exception is javax.transaction.RollbackException: One or more resources refused to commit (possibly because of a timeout in the resource - see the log for details). This transaction has been rolled back instead.
Конфигурация:
<bean id = "amqConnectionFactory" class = "org.apache.activemq.ActiveMQXAConnectionFactory">
<property name = "brokerURL" value = "tcp://localhost:61616"/>
<!-- <property name = "sendTimeout" value = "1000"/> -->
<property name = "redeliveryPolicy" ref = "redeliveryPolicy"/>
</bean>
<!-- Atomikos Connection Factory Wrapper For Gobal Tx -->
<bean id = "queueConnectionFactoryBean" class = "com.atomikos.jms.AtomikosConnectionFactoryBean">
<property name = "uniqueResourceName" value = "amq1" />
<property name = "xaConnectionFactory" ref = "amqConnectionFactory" />
</bean>
<bean id = "jmsConnectionFactory" class = "org.springframework.jms.connection.CachingConnectionFactory">
<property name = "targetConnectionFactory">
<ref bean = "queueConnectionFactoryBean"/>
</property>
<property name = "sessionCacheSize" value = "20"/>
<property name = "cacheConsumers" value = "false"/>
</bean>
<bean id = "jtaTransactionManager" class = "org.springframework.transaction.jta.JtaTransactionManager">
<property name = "transactionManager">
<bean class = "com.atomikos.icatch.jta.UserTransactionManager"
init-method = "init"
destroy-method = "close">
<property name = "forceShutdown" value = "false" />
</bean>
</property>
<property name = "userTransaction">
<bean class = "com.atomikos.icatch.jta.UserTransactionImp">
<property name = "transactionTimeout" value = "300" />
</bean>
</property>
</bean>
<!-- Topic configuration -->
<bean id = "messageListener1" class = "com.x.y.impl.JmsMessageListener"></bean>
<bean id = "container1" class = "org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name = "connectionFactory" ref = "jmsConnectionFactory"/>
<property name = "destinationName" value = "XXX_TOPIC"/>
<property name = "messageListener" ref = "messageListener1" />
<property name = "pubSubDomain" value = "true" />
<property name = "transactionManager" ref = "jtaTransactionManager" />
<property name = "concurrency" value = "1" />
<property name = "receiveTimeout" value = "3000" />
<!-- For local session and Atomikos-->
<property name = "sessionTransacted" value = "true"/>
</bean>
<!-- Bean configuration -->
<bean id = "messageListener2" class = "com.x.y.impl.JmsMessageListener"></bean>
<bean id = "container2" class = "org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name = "connectionFactory" ref = "jmsConnectionFactory"/>
<property name = "destinationName" value = "XXX_QUEUE"/>
<property name = "messageListener" ref = "messageListener2" />
<property name = "pubSubDomain" value = "false" />
<property name = "transactionManager" ref = "jtaTransactionManager" />
<property name = "concurrency" value = "1" />
<!-- For local session and Atomikos-->
<property name = "sessionTransacted" value = "true"/>
</bean>
Используемая версия Spring: 3.1, ActiveMQ 5.14, Atomikos 4.0.6. Пожалуйста, дайте мне знать, если я пропустил какие-либо настройки для темы DMLC.
Да, в конкретном исключении приложения выдается RuntimeException, это кодируется как для Queue, так и для Topic.
Что бы это ни стоило, в спецификации JMS 1.1 говорится: «Прослушиватель может выдать исключение RuntimeException, однако это считается ошибкой программирования клиента» (выделено мной). Я мог бы порекомендовать вам обрабатывать ваши исключения с помощью try/catch, регистрировать необходимые данные и откатывать транзакцию через соответствующий API.
Хорошо, попробую, но я не понимаю, почему вышеприведенное работает нормально для очереди, но не для темы. В случае RuntimeException контейнер должен позаботиться об откате и повторной доставке.
Я не обязательно думаю, что выбрасывание RuntimeException является причиной вашей проблемы, но это все же стоит исправить, тем более что это связано с мог.
Какую роль здесь играет
RuntimeException? Вы бросаете его изonMessage, чтобы вызвать повторную доставку?