У меня есть сервер ActiveMQ Artemis, который работает нормально. Теперь я хотел бы включить AMQP на сервере для приложения узла, использующего pub-sub. Хотя мои подписчики узла могут подключиться к нему, они не публикуют и не получают, даже если соединения действительны. Почему мой паб не отправляет сообщения и почему моя подписка не получает сообщения?
Я слежу за примерами издателя и подписчика для AMQP Rhea на Github. Они оба могут подключиться к localhost:5672
.
Ниже представлена моя реализация JMS-сервера ActiveMQ Artemis. Обратите внимание, что я добавляю 2 конфигурации с помощью addAcceptorConfiguration
(одну для Artemis и другую для портов AMQP по умолчанию).
//this is my jmsserver
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package testSupport.artemis.server;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class JMSServer {
private static int qId = 0;
private ActiveMQServer server;
private String errMsg = "";
/**
* Factory method to create an instance of a JMS Server
* @param topics list of topics to add to the server
* @return
*/
public static JMSServer createJMSServer(List<String> topics) {
JMSServer s = new JMSServer();
s.start();
if (topics != null) {
s.setTopics(topics);
}
return s;
}
/**
* Factory method to create an instance of a JMS Server
* @return
*/
public static JMSServer createJMSServer() {
return createJMSServer(null);
}
/**
* Updates the server config with settings required to connect invm or from
* another process on localhost
*
* @param config
*/
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672");
// SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
config.addAcceptorConfiguration("invm", "vm://0");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Default Constructor
*/
public JMSServer() {
try {
Configuration config = new ConfigurationImpl();
updateConfig(config);
server = ActiveMQServers.newActiveMQServer(config);
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Start the JMS Server
* @return
*/
public boolean start() {
boolean success = false;
try {
server.start();
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (server.isActive()) {
success = true;
break;
}
}
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
return success;
}
/**
* Stop the JMS Server
*/
public void stop() {
try {
server.stop();
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Get the Error Message
* @return
*/
public String getErrMsg() {
return errMsg;
}
/**
* Set a list of topics to add to this server
* @param topics
* @return
*/
public boolean setTopics(List<String> topics) {
boolean success = true;
if (!server.isActive()) {
errMsg = "Topics cannot be set until the server has been started.";
return false;
}
// add the topics
for (String t : topics) {
try {
SimpleString addr = SimpleString.toSimpleString(t);
QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
.autoDelete(false)
.durable(true)
.build();
server.getQueueFactory().createQueueWith(qcfg);
server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
qId++;
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
success = false;
}
}
return success;
}
}
Обновлять:
Итак, комментарии подсказывают, что мне пришлось добавить artemis-amqp-protocol
в путь к классам. В gradle я обязательно включил artemis-amqp-protocol
и artemis-jms-server
с соответствующими версиями, например:
dependencies {
implementation 'org.apache.activemq:artemis-jms-server:2.22.0'
implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0'
// other dependencies here...
}
Обновите Невермайнд. Я обнаружил, что artemis-ampq-protocol
должен соответствовать версии библиотеки artemis-jms-server
, которую я использовал. Это решило проблему! Я опубликую градиент в редактировании. Спасибо!
Вы должны убедиться, что jar artemis-amqp-protocol
находится в вашем пути к классам, иначе брокер не сможет поддерживать AMQP. Если это так, вы должны увидеть сообщение журнала, в котором говорится:
AMQ221043: Обнаружен модуль протокола: [artemis-amqp-protocol]. Добавлена поддержка протокола: AMQP.
И позже вы увидите что-то вроде:
AMQ221020: Запущен приёмник EPOLL на локальном хосте: 5672 для протоколов [AMQP].
Если вы их не видите (или что-то очень близкое к этому), значит, брокер не поддерживает AMQP.
В моем Gradle не было протокола artemis-amqp, поэтому я включил его. В журнале теперь отображается
Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
. Но все та же проблема: ни издатель не отправляет, ни подписчик не получает. Сначала я запустил подписчика, затем запустил издателя. Я использую встроенного брокера для целей разработки, поскольку мой автономный брокер в новой среде еще не настроен.