Как настроить ActiveMQ Artemis для использования AMQP 1.0 и других протоколов с Java

У меня есть сервер ActiveMQ Artemis, который работает нормально. Теперь я хотел бы включить AMQP на сервере для приложения узла, использующего pub-sub. Хотя мои подписчики узла могут подключиться к нему, они не публикуют и не получают, даже если соединения действительны. Почему мой паб не отправляет сообщения и почему моя подписка не получает сообщения?

Я слежу за примерами издателя и подписчика для AMQP Rhea на Github. Они оба могут подключиться к localhost:5672.

Ниже представлена ​​моя реализация JMS-сервера ActiveMQ Artemis. Обратите внимание, что я добавляю 2 конфигурации с помощью addAcceptorConfiguration (одну для Artemis и другую для портов AMQP по умолчанию).

//this is my jmsserver
/*
 * This Java source file was generated by the Gradle 'init' task.
 */
package testSupport.artemis.server;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;

public class JMSServer {
    private static int qId = 0;

    private ActiveMQServer server;
    private String errMsg = "";

    /**
     * Factory method to create an instance of a JMS Server
     * @param topics list of topics to add to the server
     * @return
     */
    public static JMSServer createJMSServer(List<String> topics) {
        JMSServer s = new JMSServer();
        s.start();
        if (topics != null) {
            s.setTopics(topics);
        }
        return s;
    }

    /**
     * Factory method to create an instance of a JMS Server
     * @return
     */
    public static JMSServer createJMSServer() {
        return createJMSServer(null);
    }

    /**
     * Updates the server config with settings required to connect invm or from
     * another process on localhost
     * 
     * @param config
     */
    public static void updateConfig(Configuration config) {
        try {
            config.setPersistenceEnabled(false)
                  .setSecurityEnabled(false)
                  .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
                  .addAcceptorConfiguration("amqp", "tcp://localhost:5672");

            // SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
            if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
                    Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
                config.addAcceptorConfiguration("invm", "vm://0");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * Default Constructor
     */
    public JMSServer() {
        try {
            Configuration config = new ConfigurationImpl();
            updateConfig(config);
            server = ActiveMQServers.newActiveMQServer(config);
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }
    }

    /**
     * Start the JMS Server
     * @return
     */
    public boolean start() {
        boolean success = false;
        try {
            server.start();

            for (int i = 0; i < 50; i++) {
                Thread.sleep(100);
                if (server.isActive()) {
                    success = true;
                    break;
                }
            }
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }

        return success;
    }

    /**
     * Stop the JMS Server
     */
    public void stop() {
        try {
            server.stop();
        } catch (Exception ex) {
            errMsg = ex + ": " + ex.getMessage();
            ex.printStackTrace();
        }
    }

    /**
     * Get the Error Message
     * @return
     */
    public String getErrMsg() {
        return errMsg;
    }

    /**
     * Set a list of topics to add to this server
     * @param topics
     * @return
     */
    public boolean setTopics(List<String> topics) {
        boolean success = true;

        if (!server.isActive()) {
            errMsg = "Topics cannot be set until the server has been started.";
            return false;
        }

        // add the topics
        for (String t : topics) {
            try {
                SimpleString addr = SimpleString.toSimpleString(t);
                QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
                        .autoDelete(false)
                        .durable(true)
                        .build();

                server.getQueueFactory().createQueueWith(qcfg);
                server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
                qId++;
            } catch (Exception ex) {
                errMsg = ex + ": " + ex.getMessage();
                success = false;
            }
        }
        return success;
    }

}

Обновлять: Итак, комментарии подсказывают, что мне пришлось добавить artemis-amqp-protocol в путь к классам. В gradle я обязательно включил artemis-amqp-protocol и artemis-jms-server с соответствующими версиями, например:

dependencies {
    implementation 'org.apache.activemq:artemis-jms-server:2.22.0'
    implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0'
    // other dependencies here...
}

В моем Gradle не было протокола artemis-amqp, поэтому я включил его. В журнале теперь отображается Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP. Но все та же проблема: ни издатель не отправляет, ни подписчик не получает. Сначала я запустил подписчика, затем запустил издателя. Я использую встроенного брокера для целей разработки, поскольку мой автономный брокер в новой среде еще не настроен.

Potion 14.03.2024 20:28

Обновите Невермайнд. Я обнаружил, что artemis-ampq-protocol должен соответствовать версии библиотеки artemis-jms-server, которую я использовал. Это решило проблему! Я опубликую градиент в редактировании. Спасибо!

Potion 14.03.2024 20:38
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
2
277
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы должны убедиться, что jar artemis-amqp-protocol находится в вашем пути к классам, иначе брокер не сможет поддерживать AMQP. Если это так, вы должны увидеть сообщение журнала, в котором говорится:

AMQ221043: Обнаружен модуль протокола: [artemis-amqp-protocol]. Добавлена ​​поддержка протокола: AMQP.

И позже вы увидите что-то вроде:

AMQ221020: Запущен приёмник EPOLL на локальном хосте: 5672 для протоколов [AMQP].

Если вы их не видите (или что-то очень близкое к этому), значит, брокер не поддерживает AMQP.

Другие вопросы по теме