Привет, разработчики Camel / jms. Использование соединителя Apache Camel amqp jms. И как брокер ActiveMQ.
Моя конфигурация вполне стандартная.
Вот пример потребительского кода:
public static void main(String[] args) throws Exception {
AMQPComponent amqpComponent = AMQPComponent.amqpComponent(HOST, USER, PWD);
CamelContext context = new DefaultCamelContext();
context.addComponent("amqp", amqpComponent);
context.start();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("amqp:queue:1test.queue?transacted=true")
.to("stream:out")
.end();
}
});
Thread.sleep(20*1000);
context.stop();
}
Легко увидеть, я настроил потребителя с транзакцией. для 1test.queue. Когда я его запускаю, в журнале вижу:
[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: amqp://queue:1test.queue?transacted=true
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:1 connected to remote Broker: amqp:HOST2
[AmqpProvider :(2):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(2):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:2 connected to remote Broker: amqp:HOST2
[AmqpProvider :(3):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(3):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:3 connected to remote Broker: amqp:HOST2
Если я удалю? Transailed = true с потребителя
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
[AmqpProvider :(1):[amqp:HOST2]] INFO org.apache.qpid.jms.JmsConnection - Connection ID:...:1 connected to remote Broker: amqp:HOST2
Он появляется только один раз.
Как объяснить такое поведение? Это нормально для транзакционных потребителей в верблюде?
Спасибо.
P.S Проверил Эта тема, но не уверен, как сопоставить его с реальностью Camel.
Используйте jms insted из amqp.
У меня была аналогичная проблема. Но когда я использую JMS вместо AMQP, он работал хорошо, журнал был только один раз, т.е. было создано одно соединение.
Кажется, есть проблема с компонентом AMQ.
Спасибо, Рахул
Компонент AMQP Camel не определил объединенную фабрику соединений, поэтому он создает новое соединение для каждой итерации, чтобы проверить, есть ли сообщения в брокере.
Чтобы избежать этого, вы должны определить CachingConnectionFactory для AMQPComponent как:
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.connection.CachingConnectionFactory;
@Bean
public JmsConnectionFactory jmsConnectionFactory() {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(brokerUser, brokerPassword, brokerUrl);
return jmsConnectionFactory;
}
@Bean
@Primary
public CachingConnectionFactory jmsCachingConnectionFactory(JmsConnectionFactory jmsConnectionFactory) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory);
return cachingConnectionFactory;
}
@Bean
public JmsConfiguration jmsConfig(CachingConnectionFactory cachingConnectionFactory) {
JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(cachingConnectionFactory);
jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
return jmsConfiguration;
}
@Bean
public AMQPComponent amqpComponent(JmsConfiguration jmsConfiguration) {
AMQPComponent amqpComponent = new AMQPComponent();
amqpComponent.setConfiguration(jmsConfiguration);
return amqpComponent;
}
Вы получите то же поведение, что и JMS Camel Component.
Больше информации на странице Компонент AMQP Camel