Spring @Transactional JPA в процессоре JMSConsumer Camel

Я занимаюсь обновлением существующего приложения Java SE Camel, чтобы использовать EntityManger, управляемый Spring JPA, с использованием инъекции @PersitentContext. В настоящее время он создается в нашем коде с помощью Persistence.createEntityManagerFactory, и он хорошо работает. Но мы хотели бы иметь более гибкое и тестируемое приложение с помощью внедрения этого entityManager, использовать мощные возможности @Transactional aop, а также переключаться между локальными и глобальными транзакциями XA, просто используя разные @Configuration @Profiles.

Моим первым тестом было тестирование xa-транзакций с использованием ресурсов JMS и JPA с помощью Atomikos TransactionManager, и это сработало.

Теперь в этом приложении я хочу протестировать локальные транзакции с использованием OpenJPA, Camel и JpaTransactionManager, то есть без участия транзакции XA.

У меня есть модульные тесты, которые довольно хорошо работают без Camel, ткачество @Transactional выполняет работу по запуску транзакций с jpaTransactionManager, и все в порядке.

Теперь, когда я пытаюсь интегрировать свои компоненты конфигурации в приложения Camel, дела идут не очень хорошо.

У меня есть потребитель JMS, который запускает процессор верблюда, в котором метод процесса аннотируется с помощью @Transactional.

Я думаю, что я что-то неправильно сконфигурировал, потому что JMSConsumer запускает транзакцию, даже если я не настроил компонент JMS явно для транзакции (без transhibited () в маршруте, а не в диспетчере транзакций, установленном в JMSConfiguration)

Вот точка останова, показывающая, что JmsConsumer пытается зарегистрироваться для синхронизации транзакций:

Daemon Thread [Camel (worker) thread #1 - JmsConsumer[<consumerName>]] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
    TransactionSynchronizationManager.bindResource(Object, Object) line: 175
    DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).doReceiveAndExecute(Object, Session, MessageConsumer, TransactionStatus) line: 313
    DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).receiveAndExecute(Object, Session, MessageConsumer) line: 255
    DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener() line: 1168
    DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop() line: 1160
    DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 1057
    ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
    ThreadPoolExecutor$Worker.run() line: 617   
    Thread.run() line: 745 [local variables unavailable]

Я не вижу, чтобы к TransactionSynchronisationManager был привязан какой-либо другой ресурс.

Напротив, в моих модульных тестах все идет хорошо, и мы видим в трассировке стека, что @Transactional играет свою роль, как и ожидалось:

Thread [main] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
    TransactionSynchronizationManager.bindResource(Object, Object) line: 175
    JpaTransactionManager.doBegin(Object, TransactionDefinition) line: 406
    JpaTransactionManager(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 377
    TransactionInterceptor(TransactionAspectSupport).createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String) line: 461
    TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 277
    TransactionInterceptor.invoke(MethodInvocation) line: 96    
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 179
    CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 671
    MyRepo$$EnhancerBySpringCGLIB$$5edadd7f.createRun(String) line: not available
    DataSourceConfigTest.testIt2() line: 38 
    ...

Итак, очевидно, что что-то здесь неправильно настроено, кажется, что @Transactional полностью игнорируется или отменяется какой-то волшебной автоконфигурацией SpringBoot. Есть у кого-нибудь подсказки?

Вот моя конфигурация CamelConfiguration и My Data / JPA

Конфигурация верблюда

@Configuration
public class CamelConfiguration {
    protected static Logger LOGGER = LoggerFactory.getLogger(CamelConfiguration.class);


    @Value("${broker.mqURL}")
    String mqURL;

    /**
     * The runId is injected from command Line arguments
     */
    @Value("${runId}")
    long runId;


    @Bean
    CamelContextConfiguration contextConfiguration() {
        return new CamelContextConfiguration() {
            @Override
            public void beforeApplicationStart(CamelContext context) {

                ((SpringCamelContext)context).setName("worker");

                DefaultShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
                shutdownStrategy.setTimeUnit(TimeUnit.SECONDS);
                shutdownStrategy.setTimeout(5);
                context.setShutdownStrategy(shutdownStrategy);
            }

            @Override
            public void afterApplicationStart(CamelContext camelContext) {
                try {
                    LOGGER.info("Starting route dataspecProcessing." + runId);
                    camelContext.startAllRoutes();
                } catch (Exception e) {
                    LOGGER.error("Error during Camel post start",e);
                }
            }
        };
    }

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(mqURL);
        connectionFactory.setTrustAllPackages(true);
        return connectionFactory;
    }

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
    public JmsConfiguration myJmsConfiguration(ActiveMQConnectionFactory jmsConnectionFactory,JpaTransactionManager jpaTransactionManager) {
        JmsConfiguration jmsConfiguration = new JmsConfiguration();
//      jmsConfiguration.setTransacted(true);
//      jmsConfiguration.setLazyCreateTransactionManager(false);
//      jmsConfiguration.setTransactionManager(jpaTransactionManager);
        jmsConfiguration.setConnectionFactory(jmsConnectionFactory);
        jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
        return jmsConfiguration;
    }

Конфигурация Jpa

@Configuration
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class DataSourceConfig {

    @Bean
    public JpaTransactionManager jpaTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
        transactionManager.setDataSource(dataSource());
        transactionManager.setJpaDialect(new OpenJpaDialect());
        return transactionManager;
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setPersistenceXmlLocation("META-INF/persistence.xml");
        em.setJpaProperties(additionalProperties());
        em.setDataSource(dataSource());
        em.setValidationMode(ValidationMode.NONE);
        JpaVendorAdapter vendorAdapter = new OpenJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        em.setJpaProperties(additionalProperties());

        return em;
    }

    @Bean
    public DataSource dataSource() {
        PGSimpleDataSource pgDataSource = new PGSimpleDataSource();
        pgDataSource.setUrl(...);
        pgDataSource.setUser(...);
        pgDataSource.setPassword(...);
        return pgDataSource;
    }

И код, который должен быть транзакционным

@Component
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class MyCamelProcessor implements Processor {
...
@Transactional(propagation=Propagation.REQUIRES_NEW,transactionManager = "jpaTransactionManager")
    public void process(Exchange exchange) throws Exception {
        ... calls to entitymanager here ...

Маршрут, вызывающий процессор

@Component
public class DataSpecProcessingRoute extends SpringRouteBuilder {
....
public void configure() throws Exception {
...
from("jms:queue:thequeue"?concurrentConsumers = " + numConsumerThreads)
            .autoStartup(false)
        .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
                .routeId("dataspecProcessing." + runId)
                .process(initialisationProcessor)
                .process(processorWithTransactionnalNeeded)
.to("jms:queue:anotherqueue") ;

РЕДАКТИРОВАТЬ

Он работает с программными транзакциями, а не с аннотацией @Transactional.

Можете показать код, где настраиваете маршрут?

Simon Martinelli 02.11.2018 15:11

Я просто добавил код маршрута (немного урезанный). Следует отметить, что каждый потребитель JMS запускается в своем собственном потоке и что наши процессоры запускаются с этим потоком (они не асинхронны).

greg 02.11.2018 15:41

Вы должны прочитать camel.apache.org/…

Simon Martinelli 02.11.2018 15:47

Я также заметил, что по все еще неизвестной причине каждый вызов магазина, обертывающего entitymanager, создает entitymanager. Я вижу эту запись в журналах «Создание нового EntityManager для общего вызова EntityManager». Я бы хотел иметь только одну для каждой транзакции.

greg 02.11.2018 15:48

@SimonMartinelli Я не хочу сейчас использовать транзакции JMS ... Я знаю, как действовать. Проблема заключается в весенних транзакциях JPA.

greg 02.11.2018 15:50

В Camel есть собственное определение транзакционности, и для его выполнения он больше полагается на соглашение, а не на конфигурацию. Может быть сложно следовать документации по нему, но это стоит того, чтобы прочитать. Сделать ваш процессор исключительно транзакционным - это не тот подход, который вам подходит хотеть, поскольку вы можете поместить туда любой процессор Другие, и весь маршрут не будет считаться транзакционным.

Makoto 02.11.2018 16:15

@Makoto Извините, я не уверен, что понимаю вашу точку зрения. Как я уже сказал, мне не нужна конечная точка транзакции JMS, так почему вы даете ссылку на документ транзакции JMS. Я боюсь, что мой вопрос не очень хорошо объяснен :( Я знаю, как выполнять транзакции JMS, и на самом деле я использую его в производстве. В моем случае мне нужен простой процессор верблюда, выполняющий простую транзакцию Jpa / sql. Может быть, вы имеете в виду, что аннотация Transactional Spring в некотором смысле переопределяется конфигурацией Camel? Если вы знаете об этом больше, я был бы очень благодарен.

greg 02.11.2018 16:28

Эта операция transacted() не просто обрабатывает транзакции JMS. Он также может явно обрабатывать транзакции базы данных.

Makoto 02.11.2018 16:33

Но для этого вам нужны XA-транзакции, которые я тоже успешно протестировал :) Также я не использую transact () в своем маршруте, не хочу. Я просто хотел иметь простую транзакцию spring jpa в процессоре. Таким образом, я мог бы сделать это с помощью программных транзакций (см. Мое РЕДАКТИРОВАНИЕ), но не с аннотацией ...

greg 02.11.2018 16:45

Для этого вам вообще не нужны транзакции XA. Я использовал этот подход, чтобы обеспечить откат моей базы данных при возникновении ошибки во время обработки на этом маршруте, и он работает очень и очень хорошо. Я не уверен, откуда у вас такое недоверие к транзакционным маршрутам Camel, но бороться с ними гораздо сложнее, чем использовать их.

Makoto 02.11.2018 16:47

@Makoto На самом деле я понимаю, что вы имеете в виду: я ранее тестировал, что могу использовать jpaTransactionManager в рамках транзакционного маршрута. Для этого вы вставляете jpaTransactionManager в jmsConfiguration, и он работает. Обратите внимание, что в этом случае сообщение JMS само по себе не будет использоваться в транзакции. Но это не то, чего я хотел добиться. Кстати, я не доверяю верблюду, я просто пытаюсь заставить все работать :) В любом случае, спасибо за ваше время

greg 02.11.2018 16:51
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
11
776
0

Другие вопросы по теме