Мы используем Artemis 2.19.0 и распространяем сообщения XML. Недавно мы обнаружили, что некоторые сообщения могут быть потеряны при отправке их на адрес многоадресной рассылки, к которому привязаны две устойчивые очереди многоадресной рассылки, обе эти две устойчивые очереди имеют фильтр XPATH.
Например.:
Каким-то образом IN.QUEUE1.FOO или IN.QUEUE2.FOO или оба в конечном итоге не получат 1000 сообщений.
Устали снимать фильтр с одного из них, тогда всё работает нормально, обе очереди получат по 1000 сообщений.
Итак, мой вопрос:
Если у вас есть что-то неясное, спрашивайте. Спасибо
ОБНОВЛЕНИЕ 1:
Версии:
Java: 1.8
Spring-Integration: 5.5.11
Spring-jms: 5.3.19
Artemis: 2.19.0
XML-файл:
<?xml version = "1.0" encoding = "UTF-8"?>
<beans:beans xmlns = "http://www.springframework.org/schema/integration"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans = "http://www.springframework.org/schema/beans"
xmlns:int-xml = "http://www.springframework.org/schema/integration/xml"
xmlns:int-file = "http://www.springframework.org/schema/integration/file"
xmlns:task = "http://www.springframework.org/schema/task"
xmlns:jms = "http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation = "http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd
http://www.springframework.org/schema/integration/xml
http://www.springframework.org/schema/integration/xml/spring-integration-xml-4.3.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file-4.3.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.3.xsd">
<!-- Multicast address -->
<beans:bean id = "topic" class = "org.apache.activemq.artemis.jms.client.ActiveMQTopic">
<beans:constructor-arg value = "IN.ADDRESS.FOO"/>
</beans:bean>
<!-- Anycast queue -->
<beans:bean id = "queue" class = "org.apache.activemq.artemis.jms.client.ActiveMQQueue">
<beans:constructor-arg value = "AQ.QUEUE.FOO"/>
</beans:bean>
<channel id = "topicChannel">
</channel>
<task:executor id = "executor" pool-size = "2"/>
<publish-subscribe-channel id = "outChannel" task-executor = "executor"/>
<filter id = "consumer1" input-channel = "outChannel" output-channel = "topicChannel" expression = "payload.length() > 0"/>
<filter id = "consumer2" input-channel = "outChannel" output-channel = "topicChannel" expression = "payload.length() > 0"/>
<jms:message-driven-channel-adapter id = "queueAdapter" destination = "queue" channel = "outChannel"
acknowledge = "auto" connection-factory = "ConnectionFactory"/>
<jms:outbound-channel-adapter id = "topicAdapter" destination = "topic" channel = "topicChannel"
connection-factory = "ConnectionFactory"/>
</beans:beans>
Компонент ConnectionFactory:
@Bean(name = "ConnectionFactory")
public SingleConnectionFactory ibConnectionFactory(
@Value("${artemis.broker-url}") String brokerUrl,
@Value("${artemis.user}") String username,
@Value("${artemis.password}") String password) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerUrl);
factory.setUser(username);
factory.setPassword(password);
return new SingleConnectionFactory(factory);
}
Программа-отправитель:
try(ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password)) {
Connection conn = fac.createConnection();
Session session = conn.createSession();
MessageProducer producer = session.createProducer(new ActiveMQQueue("AQ.QUEUE.FOO"));
Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
int count = 0;
while (count < 1000) {
System.out.println(count);
producer.send(msg);
count ++;
}
}
XPATH-фильтр:
XPATH '/Root/Data/PrimaryKey/Key/DetailedIdentity/ATCode[text() = "AK"]'
Пример сообщения:
<?xml version = "1.0" encoding = "UTF-8"?><Root xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>
Либо удалите атрибут Task-executor на канале публикации-подписки, либо удалите один из фильтров очереди, чтобы решить проблему.
ОБНОВЛЕНИЕ 2:
Минимальный пример с 10 одновременными задачами, отправляющими в общей сложности 1000 сообщений. Если все очереди под одним и тем же адресом имеют фильтр XPATH, то он не получит 1000 сообщений, но при удалении одного из фильтров это работает.
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestRunner {
public static void main(String[] args) throws Exception {
String brokerUrl = "(tcp://server1:61616,tcp://server2:61616)?ha=true&reconnectAttempts=-1&retryInterval=100&retryIntervalMultiplier=1.5&maxRetryInterval=6000";
String user = "admin";
String password = "admin";
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password);
ExecutorService ser = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
ser.submit(() -> {
Connection conn = null;
try {
conn = fac.createConnection();
Session session = conn.createSession();
MessageProducer producer = session.createProducer(new ActiveMQTopic("IN.ADDRESS.FOO"));
Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
int count = 0;
while (count < 100) {
System.out.println(count);
msg.setStringProperty("MessageId", String.valueOf(count));
producer.send(msg);
count++;
}
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
}
@JustinBertram Привет, Джастин, обновлен новый пример, я не знаю, правильный ли это способ использования параллельного производителя, но я думаю, что это тот же сценарий, что и в приведенном выше примере Spring, пожалуйста, ознакомьтесь с ним, большое спасибо!
Спасибо за тестовый пример. Я смог использовать его, чтобы воспроизвести ошибку, которую вы видели. Я открыл ARTEMIS-4687 и отправил PR, чтобы исправить проблему. Это будет исправлено в версии 2.33.0, которая выйдет в ближайшие несколько недель.
Привет, Джастин. Когда я проверял PR, является ли синхронизированный строитель окончательным разрешением? Я имею в виду, что если пропускная способность высокая, то каждый поток будет ждать ресурса, что еще больше замедлит процесс, может быть, можно сохранить DocumentBuilder как переменную ThreadLocal?
Синхронизация доступа к builder
— одно из возможных решений проблемы, но, конечно, не единственное. Хотя это может отрицательно повлиять на производительность, это решение тем не менее является самым простым из тех, которые я рассматривал. Если возникают жалобы на проблемы с синхронизацией, можно рассмотреть другие решения. Я склонен придерживаться принципа, что «преждевременная оптимизация — корень всех зол». Чтобы внести ясность, это не часто используемая функция. Запросы на включение, безусловно, приветствуются, если у вас есть лучшее решение. 👍🏼
@JustinBertram Привет, Джастин. Извините за задержку, я разработал минимальный пример, который может воспроизвести сценарий, пожалуйста, проверьте, спасибо.