Я использую триггер служебной шины функции Azure в Python для пакетного получения сообщений из очереди служебной шины. Несмотря на то, что этот процесс плохо документирован в Python, мне удалось включить пакетную обработку, следуя приведенному ниже Github PR.
https://github.com/Azure/azure-functions-python-library/pull/73
Вот пример кода, который я использую -
функция.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "msg",
"type": "serviceBusTrigger",
"direction": "in",
"cardinality": "many",
"queueName": "<some queue name>",
"dataType": "binary",
"connection": "SERVICE_BUS_CONNECTION"
}
]
}
__init__.py
import logging
import azure.functions as func
from typing import List
def main(msg: List[func.ServiceBusMessage]):
message_length = len(msg)
if message_length > 1:
logging.warn('Handling multiple requests')
for m in msg:
#some call to external web api
хост.json
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.3.0, 4.0.0)"
},
"extensions": {
"serviceBus": {
"prefetchCount": 100,
"messageHandlerOptions": {
"autoComplete": true,
"maxConcurrentCalls": 32,
"maxAutoRenewDuration": "00:05:00"
},
"batchOptions": {
"maxMessageCount": 100,
"operationTimeout": "00:01:00",
"autoComplete": true
}
}
}
}
После использования этого кода я вижу, что триггер служебной шины собирает сообщения в пакете из 100 (или иногда <100) на основе maxMessageCount, но я также заметил, что большинство сообщений попадают в очередь недоставленных сообщений с код причины MaxDeliveryCountExceeded
. Я пробовал с разными значениями MaxDeliveryCount от 10 до 20, но у меня был тот же результат. Итак, мой вопрос: нужно ли нам настраивать/оптимизировать MaxDeliveryCount в случае пакетной обработки сообщений служебной шины? Как они оба связаны? Какие изменения можно внести в конфигурацию, чтобы избежать этой проблемы с мертвой буквой?
@AlexAIT да, мы получаем ошибку 502 от внутреннего API, которое представляет собой веб-приложение Python, работающее в плане обслуживания приложений Linux. Это происходит, если мы отправляем более 10 сообщений в пакете. Существует ли ограничение на максимальное количество одновременных HTTP-запросов к службе приложений Linux в Azure? Можем ли мы увеличить это?
@AlexAIT Также мы получаем следующую ошибку в триггере служебной шины The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue
, я использую python, поэтому нет возможности вручную завершить сообщение, если я установил autocomplete:false
Ошибка блокировки указывает на то, что вы не обрабатываете весь пакет в течение maxAutoRenewDuration. Я предполагаю, что у вашего API есть проблема или он довольно медленный. Если вы замените вызов API задержкой в одну секунду, я не ожидаю больше ошибок. Затем вам нужно настроить параметры и API.
@AlexAIT Я использую FastAPI, но он довольно медленный. Я согласен, поскольку у меня нет настройки для конфигурации параллельных запросов, поэтому в среднем требуется 4 секунды, чтобы выполнить один ответ от API к функциональному приложению. Должен ли я попытаться увеличить maxAutoRenewDuration
? Не могли бы вы уточнить задержку в 1 секунду, я этого не понял.
Вы также можете попробовать увеличить продолжительность. Если у вас есть пакет из 100 сообщений с 4 секундами на сообщение, это будет больше, чем текущий maxAutoRenewDuration
5 минут. Задержка в 1 секунду была просто для того, чтобы доказать, что проблема заключается в API. Не вызывайте API, просто смоделируйте его, чтобы показать, что пакетная обработка работает.
У вас есть дополнительные вопросы по теме? Если нет, рассмотрите возможность пометить ответ как принятый.
@AlexAIT Да, спасибо. Это немного улучшило производительность. Теперь я не вижу, чтобы сообщения превращались в мертвую букву. Но я думаю, что мне нужно включить параллельный запрос на FastApi
Из того, что мы обсуждали в комментариях, вот с чем вы сталкиваетесь:
prefetchCount
) и блокирует их максимум на maxAutoRenewDuration
maxMessageCount
), срок действия блокировки уже истек, поэтому у вас есть исключения, и сообщение снова доставляется повторно. В конечном итоге это приводит к ошибкам MaxDeliveryCountExceeded
.Что вы можете сделать, чтобы улучшить это?
maxMessageCount
и prefetchCount
maxAutoRenewDuration
PS: Помните, что ваше приложение-функция может масштабироваться горизонтально, если вы работаете в плане потребления, что еще больше увеличивает нагрузку на ваш проблемный API.
MaxDeliveryCountExceeded
обычно возникает из-за сбоя обработки сообщения. Вы уверены, что нигде в вашем фактическом коде не возникает никаких ошибок, например, если есть ошибка при вызове API?