У меня есть простое приложение производителя и потребителя, использующее ActiveMQ Classic 6.0.1.
Это производитель:
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
// Create a Connection
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the Topic
Destination destination = session.createQueue(TOPIC_NAME);
// Create a MessageProducer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a TextMessage and publish it
TextMessage message = session.createTextMessage("Hello, JMS Topic!");
producer.send(message);
System.out.println("Message sent");
// Clean up
session.close();
connection.close();
Сообщение действительно отправлено в очередь.
Теперь у меня есть два способа использования сообщения: первый:
public class MyListener implements MessageListener {
private static final Logger logger = Logger.getLogger(WebServer.class.getName());
private static final String BROKER_URL = "tcp://messaging:61616";
private static final String TOPIC_NAME = "TEST.FOO";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// Create a connection
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Create a session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a destination (topic or queue)
Destination destination = session.createQueue(TOPIC_NAME);
// Create a consumer
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyListener());
// INFINIT LOOP
}
@Override
public void onMessage(Message message) {
System.out.println(message);
}
}
Таким образом, сообщения действительно выводятся на консоль.
Второй способ не работает (используя JMSContext):
public class MyListener implements MessageListener{
private static final Logger logger = Logger.getLogger(WebServer.class.getName());
private static final String BROKER_URL = "tcp://messaging:61616";
private static final String TOPIC_NAME = "TEST.FOO";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
consumer.setMessageListener(new MyListener());
context.start();
}
// INFINIT LOOP
}
@Override
public void onMessage(Message message) {
System.out.println(message);
}
}
В коде INFINIT LOOP я встраиваю запуск Jetty и соединение потоков. Я тоже пробовал с Thread.sleep().
Эти два приложения похожи, работают через встроенный Jetty и используют JPA (MariaDB/EclispeLink), Jersey JAX-RS и Weld-SE в качестве реализации CDI.
Есть идеи, почему второй способ (я думаю, это спецификация JMS 2.0) не работает?
Как я думаю, ActiveMQ classic 6.0.1 поддерживает JMS 2.0.





Проблема в том, что вы используете попробуйте с ресурсами, то есть:
try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
consumer.setMessageListener(new MyListener());
context.start();
}
Как только этот блок заканчивается, созданный вами JMSContext закрывается, не позволяя вашей реализации MessageListener фактически получать какие-либо сообщения. В другом коде этой проблемы нет, поскольку вы не используете блок try-with-resources и никогда явно не закрываете соединение.
Вам нужно либо удалить блок try-with-resources, либо поместить в него свой INFINIT WAIT-код.
Кроме того, нет необходимости вызывать context.start(), поскольку это делается автоматически. Такое поведение можно настроить с помощью метода setAutoStart.
Попробуйте что-то вроде этого:
public class MyListener implements MessageListener {
private static final Logger logger = Logger.getLogger(WebServer.class.getName());
private static final String BROKER_URL = "tcp://messaging:61616";
private static final String TOPIC_NAME = "TEST.FOO";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
consumer.setMessageListener(new MyListener());
// INFINIT LOOP
}
}
@Override
public void onMessage(Message message) {
System.out.println(message);
}
}
Наконец, стоит отметить, что ActiveMQ Classic еще не полностью реализует JMS 2 . Вам придется использовать ActiveMQ Artemis, если вам нужна полная реализация.