У меня есть следующие функции Azure
import logging
import azure.functions as func
from azure.storage.queue import QueueServiceClient
# Initialize queue client
queue_service = QueueServiceClient.from_connection_string("{YOUR_CONNECTION_STRING}")
queue_client = queue_service.get_queue_client("{YOUR_QUEUE_NAME}")
@app.route(route = "{YOUR_FUNCTION_ROUTE}", auth_level=func.AuthLevel.ANONYMOUS)
def GetUrlsFunction(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
# Example URL to be added to the queue
example_url = "{YOUR_URL}"
# Send message to the queue
queue_client.send_message(example_url)
logging.info(f"Message added to the queue: {example_url}")
return func.HttpResponse(
"URLs have been added to the queue successfully.",
status_code=200
)
@app.queue_trigger(arg_name = "azqueue", queue_name = "{YOUR_QUEUE_NAME}", connection = "AzureWebJobsStorage")
def ProcessMessageFunction(azqueue: func.QueueMessage):
try:
# Log the received message
logging.info("Received message: %s", azqueue.get_body())
# Decode the message body
message_body = azqueue.get_body().decode('utf-8')
logging.info('Decoded message: %s', message_body)
# Validate that the message is a URL
if not message_body.startswith("http"):
raise ValueError(f"Invalid URL format: {message_body}")
# Placeholder for URL processing logic
# This is where the URL would be processed
result = process_single_url("{BASE_URL}", message_body)
logging.info('Processing result: %s', result)
except requests.exceptions.RequestException as e:
logging.error("Network-related error: %s", str(e))
except ValueError as ve:
logging.error("Value error: %s", str(ve))
except Exception as e:
logging.error("Unhandled error: %s", str(e))
logging.exception("Exception details:")
Когда я запускаю это с помощью func start
, я получаю множество ошибок, таких как Error : Message has reached MaxDequeueCount of 5. Moving message to queue 'pcaob-poison'
.
Я не могу выяснить причину этой ошибки, так как не получаю достаточно информации даже во время работы func start --verbose
Мой host.json
выглядит так
{
"version": "2.0",
"logging": {
"logLevel": {
"Function": "Debug",
"Host.Results": "Error",
"Host.Aggregator": "Trace",
"Queues": "Debug"
},
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
и мой local.settings.json
выглядит так
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"AzureWebJobsAccountName": "{YOUR_ACCOUNT_NAME}",
"AzureWebJobsStorage": "AccountName = {YOUR_ACCOUNT_NAME};AccountKey = {YOUR_ACCOUNT_KEY};DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/{YOUR_ACCOUNT_NAME};QueueEndpoint=http://127.0.0.1:10001/{YOUR_ACCOUNT_NAME};TableEndpoint=http://127.0.0.1:10002/{YOUR_ACCOUNT_NAME};",
"PCAOB_BASE_URL": "http://localhost:5000"
}
}
Используются следующие соответствующие пакеты Azure Python.
azure-common==1.1.28
azure-core==1.30.2
azure-functions==1.20.0
azure-identity==1.16.0
azure-storage-blob==12.20.0
azure-storage-queue==12.11.0
И я использую Python 3.11
Я просто пытаюсь отладить проблему. Пока что вывод о том, что предел MaxDequeueCount достигнут, — единственное, что я получаю, и это бесполезно.
Любой совет приветствуется
Проблема заключается в триггере Http, вы не кодируете его и не отправляете, из-за чего он отправляется в подозрительную очередь. Ниже приведен код (измененный), который сработал у меня:
import logging
import base64
import requests
import azure.functions as func
from azure.storage.queue import QueueServiceClient
ri_qs = QueueServiceClient.from_connection_string("DefaultEndpointsProtocol=https;AccountNamerithstore;AccountKey=XZlFEcKM3w==;EndpointSuffix=core.windows.net")
ri_qc = ri_qs.get_queue_client("queuename")
rith = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@rith.route(route = "http_rith_trigger")
def http_rith_trigger(req: func.HttpRequest) -> func.HttpResponse:
ri_url = "https://rithwik.com/bojja"
tst_out = base64.b64encode(ri_url.encode('utf-8')).decode('utf-8')
ri_qc.send_message(tst_out)
logging.info(f"Hello message is sent to queue: {tst_out}")
return func.HttpResponse("Hello Rithwik Bojja, message is sent to queue",status_code=200)
@rith.queue_trigger(arg_name = "rith1", queue_name = "queuename", connection = "AzureWebJobsStorage")
def test_rith_process(rith1: func.QueueMessage):
logging.info("Hello Rithwik Bojja, the message recieved to queue is : %s", rith1.get_body())
ri_out = rith1.get_body().decode('utf-8')
logging.info('Hello Rithwik Bojja, Message decoded as : %s', ri_out)
if not ri_out.startswith("http"):
raise ValueError(f"Hello Rithwik Bojja, Invalid URL: {ri_out}")
def process_single_url(base_url: str, url: str) -> str:
out = requests.get(f"{base_url}/process", params = {"url": url})
return out.text
Выход: