У меня есть приложение для обмена сообщениями JMS, в которое поступают сообщения из нескольких мест назначения JMS. Полезные данные сообщения представляют собой различные представления JSON с некоторыми общими заголовками. Я полагаюсь на динамическое преобразование типов Джексона Spring на ServiceActivators для преобразования в фактические POJO. В настоящее время маршрутизация тривиальна, поскольку каналы по сути являются каналами «типа данных», разделенными по типу полезной нагрузки JSON (все они являются полезными нагрузками JSON String, но JSON представляет собой очень разные типы объектов).
Я хотел бы применить глобальную логику проверки ко всем входящим сообщениям по нескольким каналам, соответствующим шаблону, например. "*input*" и перенаправить недопустимые сообщения в канал ошибок проверки для проверки. Независимо от того, является ли сообщение действительным или недействительным, локальная транзакция JMS должна быть зафиксирована; если сообщение недействительно, я не хочу, чтобы недействительное сообщение было повторно отправлено позже.
Моя первоначальная мысль заключалась в том, чтобы реализовать ChannelInterceptor, который соответствовал бы всем каналам, где должна применяться эта логика, но не похоже, что возможность переадресации сообщения может быть реализована в ChannelInterceptor. Похоже, что у меня есть два варианта использования ChannelInterceptor:
preSend, ИЛИНи то, ни другое не является желаемым поведением. Локальная транзакция JMS всегда должна быть зафиксирована (при условии отсутствия других ошибок), а сообщение либо отправлено в исходное место назначения, либо перенаправлено на недопустимый канал сообщения.
Router может быть хорошим выбором, но не похоже, что есть способ применить маршрутизатор к набору каналов, согласованных с шаблоном, поэтому я считаю, что мне придется применить его к каждому каналу индивидуально. Я надеюсь избежать такого дублирования.
Другой вариант, о котором я подумал, - это выделить AspectJ и реализовать рекомендации @Around по методу AbstractMessageSendingTemplate.convertAndSend(destination, payload, postProcessor). Это кажется навязчивым, но похоже, что это может сработать. Если есть вариант, который лучше поддерживается непосредственно фреймворком, я был бы рад его услышать.
Если я не могу найти способ глобально применить этот тип логики маршрутизации, то другим вариантом может быть маршрутизация всех входящих сообщений JMS через один канал. Пользовательский Router может быть применен к тому входящему каналу, который использует заголовки типа полезной нагрузки, чтобы направлять сообщения в соответствующие каналы «типа данных» и направлять недопустимые сообщения в канал ошибок проверки.
Огромное спасибо!




My initial thought was to implement a ChannelInterceptor that matches all the channels where this logic should be applied, but it does not appear that the capability to divert a message can be implemented in a ChannelInterceptor
Что заставляет вас поверить в это? preSend() может вернуть null, который фактически завершает операцию; просто отправьте неудавшуюся проверку на общий канал и верните null.
/**
* Invoked before the Message is actually sent to the channel.
* This allows for modification of the Message if necessary.
* If this method returns {@code null} then the actual
* send invocation will not occur.
*/
@Nullable
default Message<?> preSend(Message<?> message, MessageChannel channel) {
return message;
}
Это вызовет MessageDeliveryException на входящем адаптере, но вы можете просто поглотить это в потоке канала ошибок.
Я надеялся, что что-то упустил, и думаю, что это может помочь. Спасибо за совет!
Чтобы было ясно, это была часть errorChannel, которую я не рассматривал. знак равно
Конечно, вы также можете просто выбросить исключение из своего перехватчика (и обработать его в потоке ошибок).
В итоге я выбросил собственное исключение в перехватчике и обработал его в потоке ошибок, и он отлично работает.
Моя позиция не делает этого в глобальном ChannelInterceptor, потому что легко попасть в паттерн с каналом, на который не должна влиять такая логика фильтрации.
Вы всегда можете отправить все сообщения в один и тот же канал для общей логики. Поведение маршрутизации, которое вы можете контролировать с помощью заголовка replyChannel, заполняемого перед отправкой в канал проверки. Итак, для меня логика выглядит так:
Каждый поток выполняет HeaderEnricher для заполнения заголовка replyChannel желаемым следующим шагом в потоке.
После этого все потоки отправляют сообщения компоненту Filter с логикой проверки.
Там, в случае сбоя, вы отправляете сообщение на discardChannelFilter, как вы объяснили в своем вопросе.
В случае успеха вы просто никуда не отправляете, кроме заголовка replyChannel. Итак, ваши действительные сообщения вернутся в исходное состояние.
Имеет ли это смысл для вас?
Чтобы не попасть в ловушку, у меня есть все ChannelInterceptors с patterns = {"!*raw-jms-input*", "*jms-input*"}. Таким образом, у меня есть способ защитить некоторые каналы от перехватчиков, если мне это нужно. Ваше предложение действительно имеет смысл, и я собираюсь оценить его и ответ Гэри, чтобы понять, что для меня наиболее разумно. Спасибо за ответ!
См. Также мой ответ.