Apache Camel? Транзакция = правда

Привет, разработчики Camel / jms. Использование соединителя Apache Camel amqp jms. И как брокер ActiveMQ.

Моя конфигурация вполне стандартная.

Вот пример потребительского кода:

 public static void main(String[] args) throws Exception {
    AMQPComponent amqpComponent = AMQPComponent.amqpComponent(HOST, USER, PWD);
    CamelContext context = new DefaultCamelContext();
    context.addComponent("amqp", amqpComponent);
    context.start();
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("amqp:queue:1test.queue?transacted=true")
                .to("stream:out")
            .end();
        }
    });
    Thread.sleep(20*1000);
    context.stop();
}

Легко увидеть, я настроил потребителя с транзакцией. для 1test.queue. Когда я его запускаю, в журнале вижу:

[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: amqp://queue:1test.queue?transacted=true
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:1 connected to remote Broker: amqp:HOST2
[AmqpProvider :(2):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(2):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:2 connected to remote Broker: amqp:HOST2
[AmqpProvider :(3):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(3):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:3 connected to remote Broker: amqp:HOST2

Если я удалю? Transailed = true с потребителя

[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:1 connected to remote Broker: amqp:HOST2

Он появляется только один раз.

Как объяснить такое поведение? Это нормально для транзакционных потребителей в верблюде?

Спасибо.

P.S Проверил Эта тема, но не уверен, как сопоставить его с реальностью Camel.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
757
2

Ответы 2

Используйте jms insted из amqp.

У меня была аналогичная проблема. Но когда я использую JMS вместо AMQP, он работал хорошо, журнал был только один раз, т.е. было создано одно соединение.

Кажется, есть проблема с компонентом AMQ.

Спасибо, Рахул

Компонент AMQP Camel не определил объединенную фабрику соединений, поэтому он создает новое соединение для каждой итерации, чтобы проверить, есть ли сообщения в брокере.

Чтобы избежать этого, вы должны определить CachingConnectionFactory для AMQPComponent как:

import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.camel.component.jms.JmsConfiguration;

import org.apache.qpid.jms.JmsConnectionFactory;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.connection.CachingConnectionFactory;

@Bean
public JmsConnectionFactory jmsConnectionFactory() {
    JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(brokerUser, brokerPassword, brokerUrl);

    return jmsConnectionFactory;
}

@Bean
@Primary
public CachingConnectionFactory jmsCachingConnectionFactory(JmsConnectionFactory jmsConnectionFactory) {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory);

    return cachingConnectionFactory;
}

@Bean
public JmsConfiguration jmsConfig(CachingConnectionFactory cachingConnectionFactory) {
    JmsConfiguration jmsConfiguration = new JmsConfiguration();

    jmsConfiguration.setConnectionFactory(cachingConnectionFactory);
    jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");

    return jmsConfiguration;
}

@Bean
public AMQPComponent amqpComponent(JmsConfiguration jmsConfiguration) {
    AMQPComponent amqpComponent = new AMQPComponent();

    amqpComponent.setConfiguration(jmsConfiguration);

    return amqpComponent;
}

Вы получите то же поведение, что и JMS Camel Component.

Больше информации на странице Компонент AMQP Camel

Другие вопросы по теме