После агрегирования обменов с помощью GroupedExchangeAggregationStrategy
мне нужно снова разделить их (чтобы получить отдельные показатели времени обработки) на исходные обмены.
Я попытался разделить с помощью следующего, но полученный разделенный обмен обертывает исходный обмен и помещает его в тело Message
.
Можно ли разделить совокупный обмен GroupedExchangeAggregationStrategy
на исходные обмены без обмена оболочкой? Мне нужно использовать исходные свойства обмена, и я хотел бы сделать это с помощью выражения SpEL.
.aggregate(constant(true), myGroupedExchangeAggregationStrategy)
.completionInterval(1000)
.completeAllOnStop()
.process { /* do stuff */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
.to(/* micrometer timer metric using SpEL expression */)
// ^- the resulting split exchange is wrapped in another exchange
В случае, если это в настоящее время не поддерживается, я пытаюсь найти лучший способ реализовать это поведение самостоятельно, не создавая собственный процессор Splitter
для этой единственной функции. Я надеялся каким-то образом переопределить SplitterIterable
, который выполняет перенос, но это не представляется возможным.
Да, GroupedExchangeAggregationStrategy
не делает ничего, кроме как создает java.util.List
из всех Обмены. С другой стороны, Разделитель EIP по умолчанию разбивает список на элементы и помещает элемент в тело сообщения. Поэтому вы получаете Exchange, который содержит Exchange в его теле.
Что вам нужно, так это AggregationStrategy, которая собирает все объекты тела в списке вместо всех бирж.
Вы можете попробовать использовать Camels FlexibleAggregationStrategy
, который можно настроить с помощью гибкого API..
новая гибкая стратегия агрегации () .storeInBody () .accumulateInCollection(ArrayList.класс) .pick(новое SimpleExpression("${body}"));
Это должно создать AggregationStrategy, который извлекает тело каждого сообщения (возможно, вы можете опустить метод выбора, поскольку извлечение тела является выбором по умолчанию), собирает их в список и сохраняет агрегацию в теле сообщения.
Чтобы снова разделить этот агрегат, достаточно простого split(body())
.
Да, вы правы, побочный эффект моего решения заключается в том, что вы теряете свойства и заголовки исходных сообщений, потому что он объединяет только тела сообщений.
Что вы хотите сделать, так это разделить список бирж обратно на оригиналы. то есть Сплиттер не должен создавать новые биржи, а использовать уже существующие и выбросить агрегированную оболочку Exchange.
Насколько я вижу в исходный код сплиттера, это в настоящее время невозможно:
Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
...
if (part instanceof Message) {
newExchange.setIn((Message) part);
} else {
Message in = newExchange.getIn();
in.setBody(part);
}
Я расширил свой комментарий, к сожалению, то, что вы хотите, кажется невозможным
да, согласен, похоже, он не поддерживается изначально. Я реализовал собственное решение (ниже), которое разбивается на состояние, идентичное Около.
Согласно принятому ответу, он не поддерживается изначально.
Этот пользовательский процессор выполнит разворачивать раздельный обмен (т. е. скопирует вложенный обмен Message
и свойства в корневой обмен). Развернутый обмен будет Около идентичен исходному — он сохранит все неконфликтующие свойства из корневого обмена (например, свойства, связанные с Splitter
, такие как разделенный индекс и т. д.).
class ExchangeUnwrapper : Processor {
override fun process(exchange: Exchange) {
val wrappedExchange = exchange.`in`.body as Exchange
ExchangeHelper.copyResultsPreservePattern(exchange, wrappedExchange)
}
}
// Route.kt
from(...)
.aggregate(...)
.process { /* do things with aggregate */ }
.split(exchangeProperty(Exchange.GROUPED_EXCHANGE))
.process(ExchangeUnwrapper())
.process { /* do something with the original exchange */ }
.end()
Спасибо, но я не думаю, что это сработает для меня, потому что я считаю, что ваша стратегия потеряет свойства исходных неагрегированных бирж, которые требуются для библиотеки показателей, которую я использую (микрометр).