ActiveMQ Artemis: адрес Muticast доставляет сообщения непоследовательно

Мы используем Artemis 2.19.0 и распространяем сообщения XML. Недавно мы обнаружили, что некоторые сообщения могут быть потеряны при отправке их на адрес многоадресной рассылки, к которому привязаны две устойчивые очереди многоадресной рассылки, обе эти две устойчивые очереди имеют фильтр XPATH.

Например.:

  1. Создайте адрес многоадресной рассылки с именем: IN.ADDRESS.FOO.
  2. Создайте под ним две устойчивые очереди с именами: IN.QUEUE1.FOO и IN.QUEUE2.FOO.
  3. Установите фильтр XPATH для обеих очередей: XPATH '/XML/DATA/Direction[text()="Right"]'
  4. Отправьте 1000 одинаковых совпадающих XML-сообщений по адресу IN.ADDRESS.FOO (с JMS и размером сообщения 20 КБ).

Каким-то образом IN.QUEUE1.FOO или IN.QUEUE2.FOO или оба в конечном итоге не получат 1000 сообщений.

Устали снимать фильтр с одного из них, тогда всё работает нормально, обе очереди получат по 1000 сообщений.

  1. Мы используем архитектуру высокой доступности с одним действующим узлом и одним резервным узлом.
  2. Мы проверили DLQ и ничего не обнаружили, поэтому максимальная попытка повторной доставки не должна быть превышена.
  3. Такое поведение происходит даже при отсутствии подключения к потребителю.

Итак, мой вопрос:

  1. Может ли это быть причиной того, что фильтр XML может работать значительно медленнее, чем обычные фильтры, из-за чего некоторые сообщения теряются?
  2. Если нет, то в чем может быть причина?

Если у вас есть что-то неясное, спрашивайте. Спасибо

ОБНОВЛЕНИЕ 1:

Версии:

Java: 1.8
Spring-Integration: 5.5.11
Spring-jms: 5.3.19
Artemis: 2.19.0

XML-файл:

<?xml version = "1.0" encoding = "UTF-8"?>
<beans:beans xmlns = "http://www.springframework.org/schema/integration"
             xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans = "http://www.springframework.org/schema/beans"
             xmlns:int-xml = "http://www.springframework.org/schema/integration/xml"
             xmlns:int-file = "http://www.springframework.org/schema/integration/file"
             xmlns:task = "http://www.springframework.org/schema/task"
             xmlns:jms = "http://www.springframework.org/schema/integration/jms"
             xsi:schemaLocation = "http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd
            http://www.springframework.org/schema/integration/xml
            http://www.springframework.org/schema/integration/xml/spring-integration-xml-4.3.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file-4.3.xsd
            http://www.springframework.org/schema/task
            http://www.springframework.org/schema/task/spring-task-4.3.xsd">
    <!-- Multicast address -->
    <beans:bean id = "topic" class = "org.apache.activemq.artemis.jms.client.ActiveMQTopic">
        <beans:constructor-arg value = "IN.ADDRESS.FOO"/>
    </beans:bean>

    <!-- Anycast queue -->
    <beans:bean id = "queue" class = "org.apache.activemq.artemis.jms.client.ActiveMQQueue">
        <beans:constructor-arg value = "AQ.QUEUE.FOO"/>
    </beans:bean>

    <channel id = "topicChannel">
    </channel>

    <task:executor id = "executor" pool-size = "2"/>

    <publish-subscribe-channel id = "outChannel" task-executor = "executor"/>

    <filter id = "consumer1" input-channel = "outChannel" output-channel = "topicChannel" expression = "payload.length() > 0"/>

    <filter id = "consumer2" input-channel = "outChannel" output-channel = "topicChannel" expression = "payload.length() > 0"/>

    <jms:message-driven-channel-adapter id = "queueAdapter" destination = "queue" channel = "outChannel"
                                        acknowledge = "auto" connection-factory = "ConnectionFactory"/>

    <jms:outbound-channel-adapter  id = "topicAdapter" destination = "topic" channel = "topicChannel"
                                  connection-factory = "ConnectionFactory"/>


</beans:beans>

Компонент ConnectionFactory:

@Bean(name = "ConnectionFactory")
    public SingleConnectionFactory ibConnectionFactory(
            @Value("${artemis.broker-url}") String brokerUrl,
            @Value("${artemis.user}") String username,
            @Value("${artemis.password}") String password) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUser(username);
        factory.setPassword(password);
        return new SingleConnectionFactory(factory);
    }

Программа-отправитель:

try(ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password)) {
            Connection conn = fac.createConnection();
            Session session = conn.createSession();
            MessageProducer producer = session.createProducer(new ActiveMQQueue("AQ.QUEUE.FOO"));
            Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");

            int count = 0;
            while (count < 1000) {
                System.out.println(count);
                producer.send(msg);
                count ++;
            }
        }

XPATH-фильтр:

XPATH '/Root/Data/PrimaryKey/Key/DetailedIdentity/ATCode[text() = "AK"]'

Пример сообщения:

<?xml version = "1.0" encoding = "UTF-8"?><Root xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>

Либо удалите атрибут Task-executor на канале публикации-подписки, либо удалите один из фильтров очереди, чтобы решить проблему.

ОБНОВЛЕНИЕ 2:

Минимальный пример с 10 одновременными задачами, отправляющими в общей сложности 1000 сообщений. Если все очереди под одним и тем же адресом имеют фильтр XPATH, то он не получит 1000 сообщений, но при удалении одного из фильтров это работает.

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;

import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestRunner {

    public static void main(String[] args) throws Exception {
        String brokerUrl = "(tcp://server1:61616,tcp://server2:61616)?ha=true&reconnectAttempts=-1&retryInterval=100&retryIntervalMultiplier=1.5&maxRetryInterval=6000";
        String user = "admin";
        String password = "admin";

        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password);
        ExecutorService ser = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            ser.submit(() -> {
                Connection conn = null;
                try {
                    conn = fac.createConnection();
                    Session session = conn.createSession();
                    MessageProducer producer = session.createProducer(new ActiveMQTopic("IN.ADDRESS.FOO"));
                    Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
                    int count = 0;
                    while (count < 100) {
                        System.out.println(count);
                        msg.setStringProperty("MessageId", String.valueOf(count));
                        producer.send(msg);
                        count++;
                    }
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

@JustinBertram Привет, Джастин. Извините за задержку, я разработал минимальный пример, который может воспроизвести сценарий, пожалуйста, проверьте, спасибо.

LiamFu 12.03.2024 12:44

@JustinBertram Привет, Джастин, обновлен новый пример, я не знаю, правильный ли это способ использования параллельного производителя, но я думаю, что это тот же сценарий, что и в приведенном выше примере Spring, пожалуйста, ознакомьтесь с ним, большое спасибо!

LiamFu 13.03.2024 12:12
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
89
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Спасибо за тестовый пример. Я смог использовать его, чтобы воспроизвести ошибку, которую вы видели. Я открыл ARTEMIS-4687 и отправил PR, чтобы исправить проблему. Это будет исправлено в версии 2.33.0, которая выйдет в ближайшие несколько недель.

Привет, Джастин. Когда я проверял PR, является ли синхронизированный строитель окончательным разрешением? Я имею в виду, что если пропускная способность высокая, то каждый поток будет ждать ресурса, что еще больше замедлит процесс, может быть, можно сохранить DocumentBuilder как переменную ThreadLocal?

LiamFu 14.03.2024 07:42

Синхронизация доступа к builder — одно из возможных решений проблемы, но, конечно, не единственное. Хотя это может отрицательно повлиять на производительность, это решение тем не менее является самым простым из тех, которые я рассматривал. Если возникают жалобы на проблемы с синхронизацией, можно рассмотреть другие решения. Я склонен придерживаться принципа, что «преждевременная оптимизация — корень всех зол». Чтобы внести ясность, это не часто используемая функция. Запросы на включение, безусловно, приветствуются, если у вас есть лучшее решение. 👍🏼

Justin Bertram 14.03.2024 16:21

Другие вопросы по теме