Я занимаюсь обновлением существующего приложения Java SE Camel, чтобы использовать EntityManger, управляемый Spring JPA, с использованием инъекции @PersitentContext. В настоящее время он создается в нашем коде с помощью Persistence.createEntityManagerFactory, и он хорошо работает. Но мы хотели бы иметь более гибкое и тестируемое приложение с помощью внедрения этого entityManager, использовать мощные возможности @Transactional aop, а также переключаться между локальными и глобальными транзакциями XA, просто используя разные @Configuration @Profiles.
Моим первым тестом было тестирование xa-транзакций с использованием ресурсов JMS и JPA с помощью Atomikos TransactionManager, и это сработало.
Теперь в этом приложении я хочу протестировать локальные транзакции с использованием OpenJPA, Camel и JpaTransactionManager, то есть без участия транзакции XA.
У меня есть модульные тесты, которые довольно хорошо работают без Camel, ткачество @Transactional выполняет работу по запуску транзакций с jpaTransactionManager, и все в порядке.
Теперь, когда я пытаюсь интегрировать свои компоненты конфигурации в приложения Camel, дела идут не очень хорошо.
У меня есть потребитель JMS, который запускает процессор верблюда, в котором метод процесса аннотируется с помощью @Transactional.
Я думаю, что я что-то неправильно сконфигурировал, потому что JMSConsumer запускает транзакцию, даже если я не настроил компонент JMS явно для транзакции (без transhibited () в маршруте, а не в диспетчере транзакций, установленном в JMSConfiguration)
Вот точка останова, показывающая, что JmsConsumer пытается зарегистрироваться для синхронизации транзакций:
Daemon Thread [Camel (worker) thread #1 - JmsConsumer[<consumerName>]] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
TransactionSynchronizationManager.bindResource(Object, Object) line: 175
DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).doReceiveAndExecute(Object, Session, MessageConsumer, TransactionStatus) line: 313
DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).receiveAndExecute(Object, Session, MessageConsumer) line: 255
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener() line: 1168
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop() line: 1160
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 1057
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
ThreadPoolExecutor$Worker.run() line: 617
Thread.run() line: 745 [local variables unavailable]
Я не вижу, чтобы к TransactionSynchronisationManager был привязан какой-либо другой ресурс.
Напротив, в моих модульных тестах все идет хорошо, и мы видим в трассировке стека, что @Transactional играет свою роль, как и ожидалось:
Thread [main] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
TransactionSynchronizationManager.bindResource(Object, Object) line: 175
JpaTransactionManager.doBegin(Object, TransactionDefinition) line: 406
JpaTransactionManager(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 377
TransactionInterceptor(TransactionAspectSupport).createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String) line: 461
TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 277
TransactionInterceptor.invoke(MethodInvocation) line: 96
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 179
CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 671
MyRepo$$EnhancerBySpringCGLIB$$5edadd7f.createRun(String) line: not available
DataSourceConfigTest.testIt2() line: 38
...
Итак, очевидно, что что-то здесь неправильно настроено, кажется, что @Transactional полностью игнорируется или отменяется какой-то волшебной автоконфигурацией SpringBoot. Есть у кого-нибудь подсказки?
Вот моя конфигурация CamelConfiguration и My Data / JPA
Конфигурация верблюда
@Configuration
public class CamelConfiguration {
protected static Logger LOGGER = LoggerFactory.getLogger(CamelConfiguration.class);
@Value("${broker.mqURL}")
String mqURL;
/**
* The runId is injected from command Line arguments
*/
@Value("${runId}")
long runId;
@Bean
CamelContextConfiguration contextConfiguration() {
return new CamelContextConfiguration() {
@Override
public void beforeApplicationStart(CamelContext context) {
((SpringCamelContext)context).setName("worker");
DefaultShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
shutdownStrategy.setTimeUnit(TimeUnit.SECONDS);
shutdownStrategy.setTimeout(5);
context.setShutdownStrategy(shutdownStrategy);
}
@Override
public void afterApplicationStart(CamelContext camelContext) {
try {
LOGGER.info("Starting route dataspecProcessing." + runId);
camelContext.startAllRoutes();
} catch (Exception e) {
LOGGER.error("Error during Camel post start",e);
}
}
};
}
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(mqURL);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public JmsConfiguration myJmsConfiguration(ActiveMQConnectionFactory jmsConnectionFactory,JpaTransactionManager jpaTransactionManager) {
JmsConfiguration jmsConfiguration = new JmsConfiguration();
// jmsConfiguration.setTransacted(true);
// jmsConfiguration.setLazyCreateTransactionManager(false);
// jmsConfiguration.setTransactionManager(jpaTransactionManager);
jmsConfiguration.setConnectionFactory(jmsConnectionFactory);
jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
return jmsConfiguration;
}
Конфигурация Jpa
@Configuration
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class DataSourceConfig {
@Bean
public JpaTransactionManager jpaTransactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
transactionManager.setDataSource(dataSource());
transactionManager.setJpaDialect(new OpenJpaDialect());
return transactionManager;
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setPersistenceXmlLocation("META-INF/persistence.xml");
em.setJpaProperties(additionalProperties());
em.setDataSource(dataSource());
em.setValidationMode(ValidationMode.NONE);
JpaVendorAdapter vendorAdapter = new OpenJpaVendorAdapter();
em.setJpaVendorAdapter(vendorAdapter);
em.setJpaProperties(additionalProperties());
return em;
}
@Bean
public DataSource dataSource() {
PGSimpleDataSource pgDataSource = new PGSimpleDataSource();
pgDataSource.setUrl(...);
pgDataSource.setUser(...);
pgDataSource.setPassword(...);
return pgDataSource;
}
И код, который должен быть транзакционным
@Component
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class MyCamelProcessor implements Processor {
...
@Transactional(propagation=Propagation.REQUIRES_NEW,transactionManager = "jpaTransactionManager")
public void process(Exchange exchange) throws Exception {
... calls to entitymanager here ...
Маршрут, вызывающий процессор
@Component
public class DataSpecProcessingRoute extends SpringRouteBuilder {
....
public void configure() throws Exception {
...
from("jms:queue:thequeue"?concurrentConsumers = " + numConsumerThreads)
.autoStartup(false)
.shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
.routeId("dataspecProcessing." + runId)
.process(initialisationProcessor)
.process(processorWithTransactionnalNeeded)
.to("jms:queue:anotherqueue") ;
РЕДАКТИРОВАТЬ
Он работает с программными транзакциями, а не с аннотацией @Transactional.
Я просто добавил код маршрута (немного урезанный). Следует отметить, что каждый потребитель JMS запускается в своем собственном потоке и что наши процессоры запускаются с этим потоком (они не асинхронны).
Вы должны прочитать camel.apache.org/…
Я также заметил, что по все еще неизвестной причине каждый вызов магазина, обертывающего entitymanager, создает entitymanager. Я вижу эту запись в журналах «Создание нового EntityManager для общего вызова EntityManager». Я бы хотел иметь только одну для каждой транзакции.
@SimonMartinelli Я не хочу сейчас использовать транзакции JMS ... Я знаю, как действовать. Проблема заключается в весенних транзакциях JPA.
В Camel есть собственное определение транзакционности, и для его выполнения он больше полагается на соглашение, а не на конфигурацию. Может быть сложно следовать документации по нему, но это стоит того, чтобы прочитать. Сделать ваш процессор исключительно транзакционным - это не тот подход, который вам подходит хотеть, поскольку вы можете поместить туда любой процессор Другие, и весь маршрут не будет считаться транзакционным.
@Makoto Извините, я не уверен, что понимаю вашу точку зрения. Как я уже сказал, мне не нужна конечная точка транзакции JMS, так почему вы даете ссылку на документ транзакции JMS. Я боюсь, что мой вопрос не очень хорошо объяснен :( Я знаю, как выполнять транзакции JMS, и на самом деле я использую его в производстве. В моем случае мне нужен простой процессор верблюда, выполняющий простую транзакцию Jpa / sql. Может быть, вы имеете в виду, что аннотация Transactional Spring в некотором смысле переопределяется конфигурацией Camel? Если вы знаете об этом больше, я был бы очень благодарен.
Эта операция transacted() не просто обрабатывает транзакции JMS. Он также может явно обрабатывать транзакции базы данных.
Но для этого вам нужны XA-транзакции, которые я тоже успешно протестировал :) Также я не использую transact () в своем маршруте, не хочу. Я просто хотел иметь простую транзакцию spring jpa в процессоре. Таким образом, я мог бы сделать это с помощью программных транзакций (см. Мое РЕДАКТИРОВАНИЕ), но не с аннотацией ...
Для этого вам вообще не нужны транзакции XA. Я использовал этот подход, чтобы обеспечить откат моей базы данных при возникновении ошибки во время обработки на этом маршруте, и он работает очень и очень хорошо. Я не уверен, откуда у вас такое недоверие к транзакционным маршрутам Camel, но бороться с ними гораздо сложнее, чем использовать их.
@Makoto На самом деле я понимаю, что вы имеете в виду: я ранее тестировал, что могу использовать jpaTransactionManager в рамках транзакционного маршрута. Для этого вы вставляете jpaTransactionManager в jmsConfiguration, и он работает. Обратите внимание, что в этом случае сообщение JMS само по себе не будет использоваться в транзакции. Но это не то, чего я хотел добиться. Кстати, я не доверяю верблюду, я просто пытаюсь заставить все работать :) В любом случае, спасибо за ваше время




Можете показать код, где настраиваете маршрут?