Итак, у меня есть несколько классов, которые позволяют мне загружать файловые объекты или объекты ввода-вывода в хранилище BLOB-объектов Azure.
Моя проблема здесь в том, что я хочу передать обратный вызов во время экспорта этих объектов, и этот обратный вызов должен выполнить несколько действий (следовательно, вызвать метод более высокого уровня).
Вот код:
from azure.storage.blob import BlobServiceClient
class UploadStatus:
def __init__(self, uploaded=0, total=None):
self.uploaded = uploaded
self.total = total
def update(self, uploaded, total):
if total is not None:
self.total = self._normalize(total)
if uploaded is not None:
self.uploaded = self._normalize(uploaded)
def progress(self):
"""Calculate the progress made for the upload incorring."""
return 100.0 * self.uploaded / self.total if self.total > 0 else 0
def _normalize(self, integer):
return integer if integer else 0
class Resource:
"""A wrapper encapsulating an io-like object"""
def __init__(self, ...):
...
class AzureBlobProxy:
def __init__(self, client):
self.client = client
@classmethod
def build(cls, params: dict):
client = BlobServiceClient(**params)
return cls(client)
def blob_export(self, container: str, name: str, io, block=None) -> dict:
"""Export file-like object to a Storage.
Args:
container (string): Container to use
name (string): Name to use
io (io.IOBase): File-like io object base on IOBase.
"""
blob_client = self.client.get_blob_client(
container=container,
blob=name
)
return blob_client.upload_blob(
data=io,
progress_hook=block
)
class AzureProvider:
def upload(self, resource, status, params = {}, block=None):
"""See BaseProvider.upload() description."""
proxy = AzureBlobProxy.build(params)
container, name = self._extract_container_and_name_from_params(params)
def progress_callback(sent, total):
print(block) # doesn't display the block function
status.update(uploaded=sent, total=total)
if block and callable(block):
block.__call__(status)
with resource.with_io() as io:
status.update(uploaded=0, total=sys.getsizeof(io))
proxy.blob_export(container, name, io, progress_callback)
return self._upload_strategy()
def _extract_container_and_name_from_params(self, params):
"""Return container and name for blob"""
...
def __upload_strategy(self):
return 'azure-blob'
def print_progress(status):
print('PROGRESS {}%'.format(int(status.progress())))
parameters = { ... }
resource = Resource(...)
upload_status = UploadStatus()
strategy = provider.upload(resource, upload_status, parameters, print_progress)
Взгляните на мою внутреннюю функцию progress_callback
, которая представляет собой Progress_hook, переданный в этот последний метод https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient ?view=azure-python#azure-storage-blob-blobclient-upload-blob
Он должен вызвать функцию print_progress
, но это не так, я не вижу появления надписи «ПРОГРЕСС %».
Однако я могу подтвердить, что файловый объект экспортируется в хранилище BLOB-объектов Azure.
Есть ли у вас какие-либо идеи?
Внутренняя функция как обратный вызов не вызывается
Вы можете использовать приведенный ниже код Python, включающий дополнительные операторы печати, которые помогут устранить проблему, когда функция block
не вызывается.
Кроме того, я внес некоторые изменения в загрузку байтов в хранилище BLOB-объектов с помощью классов.
Вот подробное объяснение того, как загрузить file-like object
в хранилище BLOB-объектов Azure с помощью Azure Blob Storage client library
для Python.
Код:
from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential
import io
import sys
class UploadStatus:
def __init__(self, uploaded=0, total=None):
self.uploaded = uploaded
self.total = total
def update(self, uploaded, total):
if total is not None:
self.total = self._normalize(total)
if uploaded is not None:
self.uploaded = self._normalize(uploaded)
def progress(self):
"""Calculate the progress made for the upload incorring."""
return 100.0 * self.uploaded / self.total if self.total > 0 else 0
def _normalize(self, integer):
return integer if integer else 0
class Resource:
"""A wrapper encapsulating an io-like object"""
def __init__(self, data: bytes):
self.data = data
def with_io(self):
return io.BytesIO(self.data)
class AzureBlobProxy:
def __init__(self, client):
self.client = client
@classmethod
def build(cls, params: dict):
client = BlobServiceClient(**params)
return cls(client)
def blob_export(self, container: str, name: str, io, block=None) -> dict:
"""Export file-like object to a Storage.
Args:
container (string): Container to use
name (string): Name to use
io (io.IOBase): File-like io object base on IOBase.
"""
blob_client = self.client.get_blob_client(
container=container,
blob=name
)
print(f"Starting upload to container: {container}, blob: {name}")
return blob_client.upload_blob(
data=io,
progress_hook=block
)
class AzureProvider:
def upload(self, resource, status, params = {}, block=None):
"""See BaseProvider.upload() description."""
proxy = AzureBlobProxy.build(params)
container, name = self._extract_container_and_name_from_params(params)
def progress_callback(sent, total):
print(f"Progress callback triggered. Sent: {sent}, Total: {total}")
status.update(uploaded=sent, total=total)
if block and callable(block):
block.__call__(status)
with resource.with_io() as io:
io_size = sys.getsizeof(io.getvalue())
print(f"IO size: {io_size}")
status.update(uploaded=0, total=io_size)
proxy.blob_export(container, name, io, progress_callback)
return self._upload_strategy()
def _extract_container_and_name_from_params(self, params):
"""Return container and name for blob"""
return params.get('container'), params.get('name')
def _upload_strategy(self):
return 'azure-blob'
def print_progress(status):
print('PROGRESS {}%'.format(int(status.progress())))
# Example usage
parameters = {
'account_url': 'https://<storage account name >.blob.core.windows.net/', # Replace with your account URL
'credential': DefaultAzureCredential(), # Replace with your credential
'container': 'test', # Replace with your container name
'name': 'sample.txt' # Replace with your blob name
}
# Create a sample file-like object
sample_data = b"Sample data for Azure Blob Storage upload. " * 1024 # 43 KB of sample data
resource = Resource(sample_data)
upload_status = UploadStatus()
provider = AzureProvider()
strategy = provider.upload(resource, upload_status, parameters, print_progress)
Приведенный выше код отслеживает ход загрузки и управляет объектами, похожими на файлы. AzureProvider
выполняет загрузку с помощью AzureBlobProxy
, который обновляет UploadStatus
и обеспечивает прогресс в реальном времени посредством обратного вызова. В примере объясняется, как загрузить образец данных и получить обратную связь о ходе выполнения.
Выход:
IO size: 44065
Starting upload to container: test, blob: sample.txt
Progress callback triggered. Sent: 44032, Total: 44032
PROGRESS 100%
Рад знать, что это помогло :)
Хорошо. Спасибо. Теперь в моем интеграционном тесте все работает так, как и ожидалось, и печать отображается.