Как настроить сервер ActiveMQ Artemis для взаимодействия сообщений через разные протоколы AMQP и Openwire

У меня есть сервер ActiveMQ Artemis, настроенный следующим образом:

public static void updateConfig(Configuration config) {
    try {
        config.setPersistenceEnabled(false)
              .setSecurityEnabled(false)
              .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
              .addAcceptorConfiguration("amqp", "tcp://localhost:5672");
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

Он может успешно прослушивать темы из моих pub-sub-приложений nodejs, использующих AMQP, и моих Java-приложений, использующих openwire. Используемая тема: TestTopic.

Хотя это работает, ни один из протоколов, похоже, не объединяет сообщения друг с другом, несмотря на то, что они получены сервером.

Если сообщение публикуется через TestTopic с использованием AMQP на моем клиенте NodeJS, сервер получает сообщение и успешно передает подписчикам клиента NodeJS, но другие слушатели OpenWire из приложений Java никогда не увидят это сообщение. Аналогично, клиенты Java OpenWire могут отправлять и получать сообщения друг другу, но сообщения клиентов Java никогда не отслеживаются клиентами NodeJS.

Сервер JMS видит успешные сообщения по обоим протоколам, но никогда не передает их всем слушателям/получателям на TestTopic.

Кажется, мне не хватает конфигурации на моем сервере ActiveMQ Artemis.

В документации написано следующее:

Перед адресом назначения следует поставить queue://, чтобы использовать пункты назначения на основе очереди, или topic://, чтобы использовать пункты назначения на основе тем. Тип назначения по умолчанию — очередь, если префикс назначения опущен.

У меня есть темы назначения NodeJS, настроенные так 'topic://TestTopic'.

// subscriber.js
var args = require('./options.js').options({
  'client': { default: 'my-client', describe: 'name of identifier for client container'},
  'subscription': { default: 'my-subscription', describe: 'name of identifier for subscription'},
  't': { alias: 'topic', default: 'topic://TestTopic', describe: 'name of topic to subscribe to'},
  'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
  'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).help('help').argv;

var connection = require('rhea').connect({ port:args.port, host: args.host, container_id:args.client });
connection.on('receiver_open', function (context) {
  console.info('subscribed');
});
connection.on('message', function (context) {
  if (context.message.body === 'detach') {
      // detaching leaves the subscription active, so messages sent
      // while detached are kept until we attach again
      context.receiver.detach();
      context.connection.close();
  } else if (context.message.body === 'close') {
      // closing cancels the subscription
      context.receiver.close();
      context.connection.close();
  } else {
      console.info(context.message.body);
  }
});
// the identity of the subscriber is the combination of container id
// and link (i.e. receiver) name
connection.open_receiver({name:args.subscription, source:{address:args.topic, durable:2, expiry_policy:'never'}});

В прослушивателе Java тема настроена как TestTopic, как показано ниже:

// client-sender.java
@JmsListener(destination = "TestTopic", selector = "${selector}")
public void receiveMessage(Message message) {
    String type = (message instanceof MapMessage) ? "MapMessage" : "TextMessage";
    try {
        logMessageBody(message);
        }
    } catch (Exception ex) {
        logger.error("Exception caught while logging the received message: ");
        logger.error(ex + ": " + ex.getCause());
    }
}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Обмен сообщениями между протоколами определенно поддерживается и будет работать при правильной настройке.

Стоит отметить, что указанная вами документация на самом деле предназначена для ActiveMQ Classic, а не для ActiveMQ Artemis. Чтобы внести ясность, вы можете использовать префиксы с ActiveMQ Artemis, но они настраиваются (в отличие от ActiveMQ Classic), и для их применения их необходимо указать в URL-адресе приемника. Это обсуждается в актуальной документации ActiveMQ Artemis. В вашем случае вам нужно это:

public static void updateConfig(Configuration config) {
    try {
        config.setPersistenceEnabled(false)
              .setSecurityEnabled(false)
              .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
              .addAcceptorConfiguration("amqp", "tcp://localhost:5672?anycastPrefix=queue://;multicastPrefix=topic://");
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

Чтобы внести ясность, адресаты очереди и темы JMS автоматически сопоставляются с произвольной и многоадресной рассылкой соответственно.

Как только оба клиента будут использовать совместимые типы маршрутизации, сообщения смогут передаваться между ними туда и обратно. Имейте в виду, что семантика публикации/подписки (т. е. темы JMS) требует, чтобы подписчик был создан до отправки каких-либо сообщений. Сообщения, отправленные при отсутствии подписчика, просто отбрасываются.

Спасибо за разъяснения и указание на то, что я ссылался на неправильную документацию. Ответ принят!

Potion 18.03.2024 21:39

Другие вопросы по теме