В [предыдущем вопросе][1] я спросил, как загружать данные асинхронно, и получил ответ. Однако при запуске кода функции Azure я получаю следующую ошибку asyncio.run() cannot be called from a running event loop
при выполнении нескольких вызовов функции Azure, и в этом случае я думаю, что они выполняются в одном потоке. Я понимаю, что вы можете проверить наличие работающего цикла событий, но я был бы признателен за советы о том, где лучше всего и как включить его в приведенный ниже код.
from azure.storage.blob.aio import BlobServiceClient
async def download_blob_to_file(blob_service_client: BlobServiceClient, container_name, transaction_date, customer_id):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{transaction_date}/{customer_id}.csv")
with open(file=f'{customer_id}.csv', mode = "wb") as sample_blob:
download_stream = await blob_client.download_blob()
data = await download_stream.readall()
sample_blob.write(data)
async def main(transaction_date, customer_id):
connect_str = "connection-string"
blob_serv_client = BlobServiceClient.from_connection_string(connect_str)
async with blob_serv_client as blob_service_client:
await download_blob_to_file(blob_service_client, "sample-container", transaction_date, customer_id)
if __name__ == '__main__':
transaction_date = '20240409'
customer_id = '001'
# customer_id_list = ['001', '002', '003', '004']
asyncio.run(main(transaction_date, customer_id))```
[1]: https://stackoverflow.com/questions/78377808/download-multiple-azure-blobs-asynchronously
Я загрузил файлы BLOB-объектов с помощью функции триггера HTTP, см. этот вывод в локальном и браузере.
Таким образом, исходный код (ответ в предыдущем вопросе) сам по себе работал нормально (HTTP-триггер), но если к функции было сделано несколько вызовов одновременно, он возвращал сообщение «asyncio.run() не может быть вызван из запуск цикла событий». Можете ли вы объяснить, как ваш пересмотренный код решает эту проблему? Я вижу, что этот код очень близко соответствует коду из предыдущего ответа, но без asyncio.run.
Согласно вашему коду, он выдает ошибку, потому что asyncio.run() выполняет ту же работу, что и триггер HTTP.
Я успешно загрузил файлы больших двоичных объектов CSV, используя функцию триггера HTTP в модели Python V1.
Код:
инициал.py:
import azure.functions as func
from azure.storage.blob.aio import BlobServiceClient
import asyncio
import os
async def download_blob_to_file(blob_service_client: BlobServiceClient, container_name, transaction_date, customer_id):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=f"{transaction_date}/{customer_id}.csv")
async with blob_client:
data = await blob_client.download_blob()
content = await data.readall()
with open(f'{customer_id}.csv', mode = "wb") as file:
file.write(content)
async def main(req: func.HttpRequest) -> func.HttpResponse:
transaction_date = req.params.get('transaction_date')
customer_ids_param = req.params.get('customer_ids')
if transaction_date is None or customer_ids_param is None:
return func.HttpResponse("Missing parameters", status_code=400)
customer_id_list = customer_ids_param.split(',')
connect_str = os.environ["Connect_str"]
blob_serv_client = BlobServiceClient.from_connection_string(connect_str)
async with blob_serv_client:
tasks = []
for customer_id in customer_id_list:
task = asyncio.create_task(download_blob_to_file(blob_serv_client, "sample-container", transaction_date, customer_id))
tasks.append(task)
await asyncio.gather(*tasks)
return func.HttpResponse("Blob download completed.", status_code=200)
локальные.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "<storage_conne>",
"FUNCTIONS_WORKER_RUNTIME": "python",
"Connect_str": "<storage_conne>"
}
}
Выход :
Файлы больших двоичных объектов были успешно загружены с помощью вышеуказанной триггерной функции HTTP, как показано ниже:
Вывод браузера:
http://localhost:7071/api/HttpTrigger1?transaction_date=20240409&customer_ids=001,002,003,004
Blob download completed.
Приведенный выше код работает локально для загрузки файлов больших двоичных объектов; однако на портале Azure мы должны хранить загруженные файлы во временном каталоге, поскольку функция Python работает только в Linux. Поэтому я добавил приведенные ниже строки для временного каталога перед развертыванием проекта в приложении-функции Azure.
import tempfile
with tempfile.TemporaryDirectory() as temp_dir:
file_path = os.path.join(temp_dir, f"{customer_id}.csv")
with open(file_path, mode = "wb") as file:
Развертывание в приложении-функции Azure:
Лазурный портал:
В какой триггерной функции вы пытаетесь?