Интеграция Spring, глобальное перенаправление недопустимых сообщений в другое место назначения

Установка

У меня есть приложение для обмена сообщениями JMS, в которое поступают сообщения из нескольких мест назначения JMS. Полезные данные сообщения представляют собой различные представления JSON с некоторыми общими заголовками. Я полагаюсь на динамическое преобразование типов Джексона Spring на ServiceActivators для преобразования в фактические POJO. В настоящее время маршрутизация тривиальна, поскольку каналы по сути являются каналами «типа данных», разделенными по типу полезной нагрузки JSON (все они являются полезными нагрузками JSON String, но JSON представляет собой очень разные типы объектов).

Эта проблема

Я хотел бы применить глобальную логику проверки ко всем входящим сообщениям по нескольким каналам, соответствующим шаблону, например. "*input*" и перенаправить недопустимые сообщения в канал ошибок проверки для проверки. Независимо от того, является ли сообщение действительным или недействительным, локальная транзакция JMS должна быть зафиксирована; если сообщение недействительно, я не хочу, чтобы недействительное сообщение было повторно отправлено позже.

Рассмотрены возможные варианты

Канал-перехватчик

Моя первоначальная мысль заключалась в том, чтобы реализовать ChannelInterceptor, который соответствовал бы всем каналам, где должна применяться эта логика, но не похоже, что возможность переадресации сообщения может быть реализована в ChannelInterceptor. Похоже, что у меня есть два варианта использования ChannelInterceptor:

  1. Откат транзакции JMS, когда перехватчик вернет null на preSend, ИЛИ
  2. недопустимое сообщение по-прежнему отправляется исходному адресату.

Ни то, ни другое не является желаемым поведением. Локальная транзакция JMS всегда должна быть зафиксирована (при условии отсутствия других ошибок), а сообщение либо отправлено в исходное место назначения, либо перенаправлено на недопустимый канал сообщения.

Маршрутизатор

Router может быть хорошим выбором, но не похоже, что есть способ применить маршрутизатор к набору каналов, согласованных с шаблоном, поэтому я считаю, что мне придется применить его к каждому каналу индивидуально. Я надеюсь избежать такого дублирования.

AspectJ Pointcut

Другой вариант, о котором я подумал, - это выделить AspectJ и реализовать рекомендации @Around по методу AbstractMessageSendingTemplate.convertAndSend(destination, payload, postProcessor). Это кажется навязчивым, но похоже, что это может сработать. Если есть вариант, который лучше поддерживается непосредственно фреймворком, я был бы рад его услышать.

Общий входной канал с маршрутизацией полезной нагрузки

Если я не могу найти способ глобально применить этот тип логики маршрутизации, то другим вариантом может быть маршрутизация всех входящих сообщений JMS через один канал. Пользовательский Router может быть применен к тому входящему каналу, который использует заголовки типа полезной нагрузки, чтобы направлять сообщения в соответствующие каналы «типа данных» и направлять недопустимые сообщения в канал ошибок проверки.

Вопросы)

  • Есть ли способ применить этот тип переадресации сообщений к набору каналов, согласованных с шаблоном?
  • Не упустил ли я важную возможность фреймворка Spring Integration, которая заставит одно из моих соображений работать?
  • Если нет, есть ли варианты EIP лучше, чем я упомянул?

Огромное спасибо!

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
245
2

Ответы 2

My initial thought was to implement a ChannelInterceptor that matches all the channels where this logic should be applied, but it does not appear that the capability to divert a message can be implemented in a ChannelInterceptor

Что заставляет вас поверить в это? preSend() может вернуть null, который фактически завершает операцию; просто отправьте неудавшуюся проверку на общий канал и верните null.

/**
 * Invoked before the Message is actually sent to the channel.
 * This allows for modification of the Message if necessary.
 * If this method returns {@code null} then the actual
 * send invocation will not occur.
 */
@Nullable
default Message<?> preSend(Message<?> message, MessageChannel channel) {
    return message;
}

Это вызовет MessageDeliveryException на входящем адаптере, но вы можете просто поглотить это в потоке канала ошибок.

См. Также мой ответ.

Artem Bilan 12.10.2018 23:31

Я надеялся, что что-то упустил, и думаю, что это может помочь. Спасибо за совет!

David Wolff 13.10.2018 13:30

Чтобы было ясно, это была часть errorChannel, которую я не рассматривал. знак равно

David Wolff 13.10.2018 13:50

Конечно, вы также можете просто выбросить исключение из своего перехватчика (и обработать его в потоке ошибок).

Gary Russell 13.10.2018 22:47

В итоге я выбросил собственное исключение в перехватчике и обработал его в потоке ошибок, и он отлично работает.

David Wolff 25.09.2019 21:37

Моя позиция не делает этого в глобальном ChannelInterceptor, потому что легко попасть в паттерн с каналом, на который не должна влиять такая логика фильтрации.

Вы всегда можете отправить все сообщения в один и тот же канал для общей логики. Поведение маршрутизации, которое вы можете контролировать с помощью заголовка replyChannel, заполняемого перед отправкой в ​​канал проверки. Итак, для меня логика выглядит так:

  1. Каждый поток выполняет HeaderEnricher для заполнения заголовка replyChannel желаемым следующим шагом в потоке.

  2. После этого все потоки отправляют сообщения компоненту Filter с логикой проверки.

  3. Там, в случае сбоя, вы отправляете сообщение на discardChannelFilter, как вы объяснили в своем вопросе.

  4. В случае успеха вы просто никуда не отправляете, кроме заголовка replyChannel. Итак, ваши действительные сообщения вернутся в исходное состояние.

Имеет ли это смысл для вас?

Чтобы не попасть в ловушку, у меня есть все ChannelInterceptors с patterns = {"!*raw-jms-input*", "*jms-input*"}. Таким образом, у меня есть способ защитить некоторые каналы от перехватчиков, если мне это нужно. Ваше предложение действительно имеет смысл, и я собираюсь оценить его и ответ Гэри, чтобы понять, что для меня наиболее разумно. Спасибо за ответ!

David Wolff 13.10.2018 13:28

Другие вопросы по теме