Настройка Java / Camel / AMQ с помощью DLQ, зависящих от маршрута

Java 8 / Camel 2.19.x / AMQ 5.15.x здесь.

У меня есть приложение Java, которое использует Camel для получения сообщений из очередей AMQ, обработки этих сообщений и работы с ними. Иногда выход маршрута помещает результат обработки обратно в другую очередь для дальнейшей последующей обработки, но не всегда / обязательно. Типичная установка Java / Camel / AMQ.

Каждый из моих маршрутов (я использую Camel XML DSL) имеет настроенный обработчик <onException>, который обычно выглядит так:

<onException useOriginalMessage = "true">
  <exception>java.lang.Exception</exception>

  <redeliveryPolicy logStackTrace = "true"/>

  <handled>
    <constant>true</constant>
  </handled>

  <log message = "${exception.stacktrace}" loggingLevel = "ERROR"/>

  <rollback markRollbackOnly = "true"/>

</onException>

Все очень просто: зарегистрируйте исключение и откат.

Я бы хотел, как часть этого обработчика <onException>, поместить Исходное сообщение (который не удался и вызвал исключение, нет исключение!) В DLQ для конкретного маршрута (под "DLQ" я имею в виду просто очередь, куда могут быть отправлены неудачные сообщения для целей аудита / отчетности / воспроизведения)

Это означает, что если у моего приложения есть 30 маршрутов, каждый из которых использует 30 разных очередей AMQ, у меня будет 30 разных «DLQ», куда каждый из их соответствующих обработчиков <onException> будет отправлять сообщения о сбое.

В идеале я бы хотел, чтобы эта конфигурация была на стороне AMQ (опять же, возможно, внутри activem.xml или аналогичного), чтобы мне не нужно было вносить изменения в код или повторно развертывать, если нужно изменить места назначения DLQ. Но если это можно сделать только внутри маршрута / конфигураций Camel, это тоже нормально.

I предполагать Я мог бы изменить каждый маршрут, чтобы он содержал свой собственный настроенный пункт назначения DLQ для исходных сообщений:

<onException useOriginalMessage = "true">
  <exception>java.lang.Exception</exception>

  <redeliveryPolicy logStackTrace = "true"/>

  <handled>
    <constant>true</constant>
  </handled>

  <log message = "${exception.stacktrace}" loggingLevel = "ERROR"/>

  <rollback markRollbackOnly = "true"/>

  <to uri = "activemq:fizzbuzz.dlq"/>

</onException>

Но я надеюсь на что-то более элегантное, чем это ...

Есть идеи, как я могу это сделать?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
171
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Возможно, вам подойдет такой способ:

             DeadLetterChannelBuilder errorHandlerBuilder = deadLetterChannel("jms:dummy");
    errorHandlerBuilder.onPrepareFailure(exchange -> {
        exchange.getIn().setHeader("CamelJmsDestinationName",exchange.getIn().getHeader("JMSDestination",String.class).concat(".DLQ"));
    });

    from("jms:input1")
            .to("seda:process");

    from("jms:input2")
            .to("seda:process");

    from("jms:input3")
            .to("seda:process");

    from("seda:process").errorHandler(errorHandlerBuilder)
            .process(exchange -> {
                throw new RuntimeException();
            });

Вы можете вычислить имя очереди DLQ во время выполнения. DeadLetterChannelBuilder также может быть настроен как ваш onException.

Thans @ c0old (+1) - это интересное решение ... быстро, в чем разница между заголовком JmsDestination и заголовком CamelJmsDestinationName? Спасибо еще раз!!!

hotmeatballsoup 27.09.2018 14:45

Согласно документации JMS «Когда сообщение получено, его значение JMSDestination должно быть эквивалентно значению, присвоенному при его отправке». CamelJmsDestinationName используется для динамически вычисляемого имени очереди во время отправки. JmsProducer будет использовать значение в этом заголовке для отправки вместо указанного в конечной точке.

c0ld 27.09.2018 15:43

После тестирования очереди выглядят как это. queue: // input1 - значение заголовка JMSDestination (согласно потребителю input1)

c0ld 27.09.2018 15:49

Если вы хотите настроить в самом ActiveMQ, вы также можете добавить конкретный policyEntry и использовать подстановочные знаки в deadLetterStrategy, как в этом примере:

Создайте выделенный DLQ для всех очередей ВХОДЯЩИЙ:

<policyEntry queue = "*.INBOUND.>">
    <deadLetterStrategy>
        <individualDeadLetterStrategy processExpired = "false" queuePrefix = "" queueSuffix = ".DLQ" useQueueForQueueMessages = "true"/>
    </deadLetterStrategy>
</policyEntry>

В этом случае я ловлю ошибку потребителя во всех моих входящих очередях:

XXX.INBOUND.AAA        -> XXX.INBOUND.AAA.DLQ
YYY.INBOUND            -> YYY.INBOUND.DLQ
ZZZ.INBOUND.BBB.CCC    -> ZZZ.INBOUND.BBB.CCC.DLQ

Но

NNN.MMM.INBOUND        -> ActiveMQ.DLQ 

поскольку шаблон * не соответствует символу точки . в NNN.MMM.

Вы можете настроить свой шаблон в соответствии со своим вариантом использования.

Это очень полезно для управления потребителями, которые не улавливают исключения должным образом, и это делается на уровне ActiveMQ.

Другие вопросы по теме