Я использую сообщения из очереди SQS FIFO с установленным maxMessagesPerPoll=5
.
В настоящее время я обрабатываю каждое сообщение индивидуально, что является пустой тратой ресурсов. В моем случае, поскольку мы используем очередь FIFO и все эти 5 сообщений связаны с одним и тем же объектом, я мог бы обработать их все вместе.
Я думал, что это можно сделать с помощью шаблона aggregate
, но я не смог получить никаких результатов.
Мой потребительский маршрут выглядит так:
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.process(exchange -> {
// process the message
})
Я считаю, что можно сделать что-то подобное
from("aws-sqs://my-queue?maxMessagesPerPoll=5&messageGroupIdStrategy=usePropertyValue")
.aggregate(const(true), new GroupedExchangeAggregationStrategy())
.completionFromBatchConsumer()
.process(exchange -> {
// process ALL messages together as I now have a list of all exchanges
})
но processor
никогда не вызывается.
Второе: Если я смогу это сделать, когда ACK будет отправлен в SQS? Когда обрабатывается каждое отдельное сообщение или когда завершается совокупный процесс? Я надеюсь, что последнее
Когда процессор не вызывается, агрегатор, вероятно, будет агрегировать все еще ждет новых сообщений.
Вы можете попробовать использовать completionSize(5)
вместо completionFromBatchConsumer()
для теста. Если это работает, проблема заключается в определении завершения пакета.
Для модели ACK против брокера: к сожалению, нет. Я думаю, что сообщение фиксируется, когда доходит до агрегатора.
Компонент агрегатора Camel является компонентом с отслеживанием состояния и, следовательно, должен завершить текущую транзакцию.
По этой причине вы можете оснастить такие компоненты постоянные репозитории, чтобы избежать потери данных, когда процесс завершается. В таком сценарии уже агрегированные сообщения, очевидно, будут потеряны, если у вас нет прикрепленного постоянного репозитория.
Проблема кроется в GroupedExchangeAggregationStrategy
Когда я использую эту стратегию, на выходе получается «массив» всех обменов. Это означает, что обмен, который доходит до предиката завершения, больше не имеет исходных свойств. Вместо этого у него есть CamelGroupedExchange
и CamelAggregatedSize
, которые не используются для completionFromBatchConsumer()
.
Поскольку на самом деле мне не нужно агрегировать все биржи, достаточно использовать GroupedBodyAggregationStrategy
. Тогда свойства обмена останутся такими же, как и в исходном обмене, и только тело будет содержать «массив»
Другое решение - использовать completionSize(Predicate predicate)
и специальный предикат, который извлекает необходимое значение из групповых обменов.
Спасибо за чаевые. Действительно, использование
copmletionSize(5)
сделало свою работу. Это привело меня к решению, которое я опубликую в качестве ответа.