Я использую ServiceBusProcessorClient и AmqpRetryOptions для подключения моей служебной шины Azure. Столкновение с двумя проблемами.
Потребитель ждет 30 секунд, чтобы обработать следующее сообщение от ASB. Это из-за настроек trytimeout? Я установил значение 30 секунд, поскольку, получив сообщение от ASB, мы вызываем API, и иногда для ответа может потребоваться 30 секунд.
Когда у меня есть одновременные сеансы и максимальное количество одновременных вызовов равно 4, от ASB потребуется 4 сообщения, но если одно из них выйдет из строя, в блоке исключений я выполняю функцию serviceBusProcessorClient.close(), которая остановит потребителя, и в этом случае он вернет в ASB все 4 сообщения, а не только ошибочное. Есть ли решение?
Конфигурация кода ниже, значения просто помещены в виде строки для простоты.
ampqRetryOptions.setDelay(Duration.ofSeconds(Integer.parseInt(serviceBusConfig.getAmpqDelay())));
ampqRetryOptions.setMaxRetries(<5>);
ampqRetryOptions.setMaxDelay(<1>);
ampqRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
ampqRetryOptions.setTryTimeout(<30 seconds>);
serviceBusProcessorClient = new ServiceBusClientBuilder()
.connectionString(connectioString).retryOptions(ampqRetryOptions)
.sessionProcessor().maxConcurrentSessions(4))
.maxConcurrentCalls(4))
.queueName(<queueName>)
.maxAutoLockRenewDuration(50 seconds)
.processMessage(onMessage()).disableAutoComplete()
.processError(onError)
.buildProcessorClient();
Как я могу выполнить подтверждение завершения для каждого сообщения? Объект контекста имеет полный метод, и я делаю это, если в методе приема в последней строке метода не создается никаких исключений.





В зависимости от вашей текущей конфигурации тайм-аут простоя сеанса составляет 30 секунд в зависимости от вашего retryOptions.getTryTimeout(). Вы можете переопределить это поведение, установив sessionIdleTimeout(Duration) при создании ServiceBusProcessorClient. Клиент-процессор постоянно перекачивает сообщения из сеанса при получении сеанса. Как tryTimeout и sessionIdleTimeout влияют на это, так это то, что если в течение тайм-аута не получено ни одного сообщения, мы завершаем сеанс и переходим к получению нового сеанса.
Вызов ServiceBusProcessorClient.close() закроет все текущие сеансы и полностью остановит процессор. Решение зависит от того, почему не удалось обработать сообщение, какой у вас резервный вариант для этих сообщений и т. д.
Существует несколько способов обработки сообщений: вы можете выбрать «Отказаться», «Недоставленное письмо» или «Завершить». Отказ от сообщений приведет к отправке сообщения обратно в очередь для повторной доставки (в случае, если какой-либо другой потребитель сможет его обработать) до тех пор, пока не будет достигнуто максимальное количество попыток доставки. Недоставленное сообщение отправит сообщение в очередь недоставленных писем. Вам необходимо включить эту функцию при создании очереди. Подробнее об этих видах расчетов см. Расчетно-приемные операции.
Я попробовал sessionIdleTimeout, и это сработало, а что касается одновременных сеансов, похоже, это была ошибка, и она была устранена в будущей версии - github.com/Azure/azure-sdk-for-java/issues/37272. Спасибо @Конни Яу
Вы должны явно отправлять подтверждение завершения для каждого сообщения, в противном случае оно будет возвращено в очередь по истечении времени блокировки и в соответствии с максимальным количеством раз оно будет возвращено в активную очередь, которую вы настроили. Невыполнение их в конечном итоге вернет их в очередь, но это зависит от конфигурации очереди.