Python — служебная шина функций Azure запускает пакетную обработку

Я использую триггер служебной шины функции Azure в Python для пакетного получения сообщений из очереди служебной шины. Несмотря на то, что этот процесс плохо документирован в Python, мне удалось включить пакетную обработку, следуя приведенному ниже Github PR.

https://github.com/Azure/azure-functions-python-library/pull/73

Вот пример кода, который я использую -

функция.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "cardinality": "many",
      "queueName": "<some queue name>",
      "dataType": "binary",
      "connection": "SERVICE_BUS_CONNECTION"
    }
  ]
}

__init__.py

import logging

import azure.functions as func
from typing import List

def main(msg: List[func.ServiceBusMessage]):
    message_length = len(msg)
    if message_length > 1:
        logging.warn('Handling multiple requests')

    for m in msg:
       #some call to external web api

хост.json

"version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.3.0, 4.0.0)"
  },
  "extensions": {
    "serviceBus": {
      "prefetchCount": 100,
      "messageHandlerOptions": {
        "autoComplete": true,
        "maxConcurrentCalls": 32,
        "maxAutoRenewDuration": "00:05:00"
      },
      "batchOptions": {
        "maxMessageCount": 100,
        "operationTimeout": "00:01:00",
        "autoComplete": true
      }
    }
  }
}

После использования этого кода я вижу, что триггер служебной шины собирает сообщения в пакете из 100 (или иногда <100) на основе maxMessageCount, но я также заметил, что большинство сообщений попадают в очередь недоставленных сообщений с код причины MaxDeliveryCountExceeded. Я пробовал с разными значениями MaxDeliveryCount от 10 до 20, но у меня был тот же результат. Итак, мой вопрос: нужно ли нам настраивать/оптимизировать MaxDeliveryCount в случае пакетной обработки сообщений служебной шины? Как они оба связаны? Какие изменения можно внести в конфигурацию, чтобы избежать этой проблемы с мертвой буквой?

MaxDeliveryCountExceeded обычно возникает из-за сбоя обработки сообщения. Вы уверены, что нигде в вашем фактическом коде не возникает никаких ошибок, например, если есть ошибка при вызове API?
Alex AIT 10.02.2023 08:24

@AlexAIT да, мы получаем ошибку 502 от внутреннего API, которое представляет собой веб-приложение Python, работающее в плане обслуживания приложений Linux. Это происходит, если мы отправляем более 10 сообщений в пакете. Существует ли ограничение на максимальное количество одновременных HTTP-запросов к службе приложений Linux в Azure? Можем ли мы увеличить это?

Niladri 10.02.2023 09:28

@AlexAIT Также мы получаем следующую ошибку в триггере служебной шины The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, я использую python, поэтому нет возможности вручную завершить сообщение, если я установил autocomplete:false

Niladri 10.02.2023 09:37

Ошибка блокировки указывает на то, что вы не обрабатываете весь пакет в течение maxAutoRenewDuration. Я предполагаю, что у вашего API есть проблема или он довольно медленный. Если вы замените вызов API задержкой в ​​​​одну секунду, я не ожидаю больше ошибок. Затем вам нужно настроить параметры и API.

Alex AIT 10.02.2023 15:34

@AlexAIT Я использую FastAPI, но он довольно медленный. Я согласен, поскольку у меня нет настройки для конфигурации параллельных запросов, поэтому в среднем требуется 4 секунды, чтобы выполнить один ответ от API к функциональному приложению. Должен ли я попытаться увеличить maxAutoRenewDuration? Не могли бы вы уточнить задержку в 1 секунду, я этого не понял.

Niladri 10.02.2023 17:31

Вы также можете попробовать увеличить продолжительность. Если у вас есть пакет из 100 сообщений с 4 секундами на сообщение, это будет больше, чем текущий maxAutoRenewDuration 5 минут. Задержка в 1 секунду была просто для того, чтобы доказать, что проблема заключается в API. Не вызывайте API, просто смоделируйте его, чтобы показать, что пакетная обработка работает.

Alex AIT 10.02.2023 17:38

У вас есть дополнительные вопросы по теме? Если нет, рассмотрите возможность пометить ответ как принятый.

Alex AIT 14.02.2023 10:45

@AlexAIT Да, спасибо. Это немного улучшило производительность. Теперь я не вижу, чтобы сообщения превращались в мертвую букву. Но я думаю, что мне нужно включить параллельный запрос на FastApi

Niladri 15.02.2023 08:57
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
8
120
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Из того, что мы обсуждали в комментариях, вот с чем вы сталкиваетесь:

  • Ваше приложение-функция получает 100 сообщений из ServiceBus (prefetchCount) и блокирует их максимум на maxAutoRenewDuration
  • Код вашей функции обрабатывает сообщения по одному с низкой скоростью из-за API, который вы вызываете.
  • К тому времени, когда вы закончите пакет сообщений (maxMessageCount), срок действия блокировки уже истек, поэтому у вас есть исключения, и сообщение снова доставляется повторно. В конечном итоге это приводит к ошибкам MaxDeliveryCountExceeded.

Что вы можете сделать, чтобы улучшить это?

  • Уменьшить maxMessageCount и prefetchCount
  • Увеличить maxAutoRenewDuration
  • Увеличьте производительность вашего API (как это сделать, это другой вопрос)
  • Ваш текущий код был бы намного лучше, если бы вы использовали «обычный» триггер одиночного сообщения вместо пакетного триггера.

PS: Помните, что ваше приложение-функция может масштабироваться горизонтально, если вы работаете в плане потребления, что еще больше увеличивает нагрузку на ваш проблемный API.

Другие вопросы по теме