У меня есть сервер ActiveMQ Artemis, настроенный следующим образом:
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672");
} catch (Exception ex) {
ex.printStackTrace();
}
}
Он может успешно прослушивать темы из моих pub-sub-приложений nodejs, использующих AMQP, и моих Java-приложений, использующих openwire. Используемая тема: TestTopic
.
Хотя это работает, ни один из протоколов, похоже, не объединяет сообщения друг с другом, несмотря на то, что они получены сервером.
Если сообщение публикуется через TestTopic
с использованием AMQP на моем клиенте NodeJS, сервер получает сообщение и успешно передает подписчикам клиента NodeJS, но другие слушатели OpenWire из приложений Java никогда не увидят это сообщение. Аналогично, клиенты Java OpenWire могут отправлять и получать сообщения друг другу, но сообщения клиентов Java никогда не отслеживаются клиентами NodeJS.
Сервер JMS видит успешные сообщения по обоим протоколам, но никогда не передает их всем слушателям/получателям на TestTopic
.
Кажется, мне не хватает конфигурации на моем сервере ActiveMQ Artemis.
В документации написано следующее:
Перед адресом назначения следует поставить
queue://
, чтобы использовать пункты назначения на основе очереди, илиtopic://
, чтобы использовать пункты назначения на основе тем. Тип назначения по умолчанию — очередь, если префикс назначения опущен.
У меня есть темы назначения NodeJS, настроенные так 'topic://TestTopic'
.
// subscriber.js
var args = require('./options.js').options({
'client': { default: 'my-client', describe: 'name of identifier for client container'},
'subscription': { default: 'my-subscription', describe: 'name of identifier for subscription'},
't': { alias: 'topic', default: 'topic://TestTopic', describe: 'name of topic to subscribe to'},
'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).help('help').argv;
var connection = require('rhea').connect({ port:args.port, host: args.host, container_id:args.client });
connection.on('receiver_open', function (context) {
console.info('subscribed');
});
connection.on('message', function (context) {
if (context.message.body === 'detach') {
// detaching leaves the subscription active, so messages sent
// while detached are kept until we attach again
context.receiver.detach();
context.connection.close();
} else if (context.message.body === 'close') {
// closing cancels the subscription
context.receiver.close();
context.connection.close();
} else {
console.info(context.message.body);
}
});
// the identity of the subscriber is the combination of container id
// and link (i.e. receiver) name
connection.open_receiver({name:args.subscription, source:{address:args.topic, durable:2, expiry_policy:'never'}});
В прослушивателе Java тема настроена как TestTopic
, как показано ниже:
// client-sender.java
@JmsListener(destination = "TestTopic", selector = "${selector}")
public void receiveMessage(Message message) {
String type = (message instanceof MapMessage) ? "MapMessage" : "TextMessage";
try {
logMessageBody(message);
}
} catch (Exception ex) {
logger.error("Exception caught while logging the received message: ");
logger.error(ex + ": " + ex.getCause());
}
}
Обмен сообщениями между протоколами определенно поддерживается и будет работать при правильной настройке.
Стоит отметить, что указанная вами документация на самом деле предназначена для ActiveMQ Classic, а не для ActiveMQ Artemis. Чтобы внести ясность, вы можете использовать префиксы с ActiveMQ Artemis, но они настраиваются (в отличие от ActiveMQ Classic), и для их применения их необходимо указать в URL-адресе приемника. Это обсуждается в актуальной документации ActiveMQ Artemis. В вашем случае вам нужно это:
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672?anycastPrefix=queue://;multicastPrefix=topic://");
} catch (Exception ex) {
ex.printStackTrace();
}
}
Чтобы внести ясность, адресаты очереди и темы JMS автоматически сопоставляются с произвольной и многоадресной рассылкой соответственно.
Как только оба клиента будут использовать совместимые типы маршрутизации, сообщения смогут передаваться между ними туда и обратно. Имейте в виду, что семантика публикации/подписки (т. е. темы JMS) требует, чтобы подписчик был создан до отправки каких-либо сообщений. Сообщения, отправленные при отсутствии подписчика, просто отбрасываются.
Спасибо за разъяснения и указание на то, что я ссылался на неправильную документацию. Ответ принят!