Есть ли способ прочитать и обработать большой двоичный объект Azure как поток с помощью пакета SDK Python без загрузки всего большого двоичного объекта в память?

Я хотел бы иметь возможность обрабатывать BLOB-объект Azure как объект ввода-вывода, используя пакет SDK для Python. Насколько я могу судить, для этого от меня требуется либо:

а) используйте .readinto() для чтения большого двоичного объекта в объект ввода-вывода (таким образом загружая весь большой двоичный объект неопределенного размера в память внутри контейнера с 256 МБ памяти)

б) вручную вызвать .read() со смещением и лимитом (таким образом, мне нужно точно знать, сколько я хочу прочитать в данный момент времени)

Я пытаюсь прочитать заархивированный файл BSON mongodump с помощью bson.decode_file_iter, поэтому я не знаю точно, сколько байтов я хочу прочитать за определенный отрезок времени (по крайней мере, без необходимости самому немного декодировать BSON), и, очевидно, Решение а) не очень хорошо, если у меня есть особенно гигантские файлы дампа (например, я сейчас работаю с файлом, который сейчас несжат на половину гигабайта). Насколько мне известно, в пакете SDK Azure Blob нет ничего, что раскрывало бы это. В идеальной вселенной я мог бы передать большой двоичный объект через распаковку gzip в функцию декодирования bson. Есть ли что-то, что я могу использовать для достижения этой цели без необходимости писать полный слой перевода, или мне следует придерживаться моего текущего подхода: загрузить большой двоичный объект в файл, а затем использовать gzip.open для чтения загруженного файла и передать его в bson.decode_file_iter?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
76
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Есть ли способ прочитать и обработать большой двоичный объект Azure как поток с помощью пакета SDK Python без загрузки всего большого двоичного объекта в память?

Вы можете использовать приведенный ниже код для чтения и обработки большого двоичного объекта Azure в виде потока с помощью пакета SDK Python без загрузки всего большого двоичного объекта в память.

Код:

import io
import gzip
import bson
from azure.storage.blob import BlobServiceClient

class BlobStream(io.RawIOBase):
    def __init__(self, blob_client):
        self.blob_client = blob_client
        self.stream = self.blob_client.download_blob()
        self.stream_iter = self.stream.chunks()
        self.buffer = b""

    def readinto(self, b):
        try:
            chunk = next(self.stream_iter)
            self.buffer += chunk
        except StopIteration:
            pass
        size = len(self.buffer)
        length = min(len(b), size)
        b[:length] = self.buffer[:length]
        self.buffer = self.buffer[length:]
        return length

    def readable(self):
        return True
    
blob_service_client = BlobServiceClient.from_connection_string("DefaultEndpointsProtocol=https;AccountName=venkat123;AccountKey=/Z3A9SDaP7a8DKxolqooke13Z4Uyxxxxx==;EndpointSuffix=core.windows.net")
blob_client = blob_service_client.get_blob_client(container = "test", blob = "large_sample.gz")

blob_stream = BlobStream(blob_client)

with gzip.GzipFile(fileobj=blob_stream) as gzipped_blob:
    for doc in bson.decode_file_iter(gzipped_blob):
        print(doc)

В этом коде создан пользовательский класс BlobStream, расширяющий io.RawIOBase. Этот класс использует метод download_blob объекта BlobClient для загрузки большого двоичного объекта в поток.

Он читает поток порциями, используя метод chunks объекта StorageStreamDownloader. Метод readinto класса BlobStream считывает данные из потока в буфер и возвращает количество прочитанных байтов.

Обрабатывая поток порциями и буферизуя данные, код эффективно обрабатывает большие BLOB-объекты, не загружая весь BLOB-объект в память сразу.

Выход:

{vGAxXP6r4r': '42tt1xFZ4fsHQizfxsEbhIepntrWBcd2KSU7P58RFp0OiQiMZ5kDKIdoKscVazIxXBjtOJAQNv3oLGCGrvLPgiOrDSbjGJ4wXH6h', 'USvIfwuHQ6': 'pOk9xAXWdKuPVcWImLYKRsmDq1fIHCgRfHmUnv5t5tTT3fg6RE0bKmCf6V8UQ9JBYdlxRxMAAxCsdfBeIdNO8PtQOswxewlqnFGZ', 'YNgIcaA3N9': '3A4GLIaLmpjSaRZGecZqUh76xjRGTnBQ4sXB3Bcl7CSh8ycToVWmX81mRlKvVqSnYnDy92fpv7kOD5hxdIG6JCWps5cLm99e3Gqr', 'DLsJ1QOXvJ': 'f6NK561gNY0ZYxJcKO2KoKxbyRjzIgiKG9AjNWes3ti3kSTyuwjKNuhb48MiHMTuGFblLz4ufFpAj76tD550ajTqBbyeODXkyZhc', 'xnip5jrUGV': 'K4OEwJVhkd1gzBwqeq5LDkNNZU2Z2MMr7FSy42SOSn2SBI8xlpvcrbqTZs5JZWBi9abP4ziaE3lqf6FBvyYJ2iQeiYpuWybPRnAE', 'KZbTjdxBcw': 'wYX2ioBqxTbCTzHqmJEOv4fNsDq8wFKAWD8x3I4mX7fmpyI0K9ZODUi0nn7020tgXD021TXuSlhjAyMLaJVy9uhmZ7cu3l4JmyEU', 'POZkbGX1Oo': 'Rstj0ldaOFLy4doyPMXbLR9MH05TRLW820yWWsHAlAzb3uporaLxyON290w6tPLRUiOVJqfr8l1q3HpSc48yxuheZ0cz3pgwRqQl', 'b6zzcCZqvT': 'fISfGw3ftsB6BkmJYosIgKeMsSTHYbWIDrYMfH14LA7hYg4oj852UQ2I76uHC93nltZLIawDKv5GFXZLlcYLQwPmDsuFGC22cpaN', 'fBb6tmprgQ': '8JmO0twpLmNtjNQn7Zn5NeByLzEpSldfPUtCWOQytfcj2aFizu7ma8bWGVRY5Xbg8v5eX8lWJC6k7ddOQRZNoAdSWiCOQBfD8m7J', 'YWd247222J': 'Y05nSM1lcnUrnsmEKndc6hoX6pItvoJFkkE5qYXuSCmgpLnwcfKgY8lUIgXLeG04BUIVqVytNGgpaNHKOyXb7hZcl7uJpgS2tmeG', 'UuA86rFKln': 'nNWuL6SRUoUrkoMCR8dKb5vzFxGi4JVwRbGZHcC6lrCUk5ZRZdkL0VoG1RScRCwE3l1KSWCUaKeWbGbvPpSwFIBJSPvvk1bJZUFy', 'sqH5XlZ6Dv': 'GHtgkrDtxJcQwRY5WSHo1oJwPpjhh8NCkh4E7Wb1qCXUE2SWdaZLpnPxFUx9mJMQgAsf3Rrs2djlvkTL0B3J9xudpCRhv8v9gdM4', 'TSy8HqfeuJ': '9UDVGZQLOhnd7Cgknstfzeth8SM1gGecYDfufj5MWRaWoVGetJ0eVt2unUEgjjVMBumUZgW2jPJFC1h4tPbYlHWs2kvBDRTsHJBZ', 'AeSdgHc3ML': 'ImJquzYHwfV394qlp09i2udfJ0o8NjsXZusyLALjYD3jKMrGvpRmLGpQJQlpKcd5vSXMOKXQFET665ix0wB0vARFodSgddRqDHmS', 'MyWBr0uG5W': 'lv00rWQ42e4yrWen8V9VL9KgQPYCC9EGh5kifN52rjzgdSO1iRtif7b32qZ5weSaQz6oHfHQflUhLPSMr6ZpwpI00kylzt6RK7WA', 'XLGO6tFBwV': 'aiWVzwNAQt4QoRfJBUWNPykxAPlWsY6vQFDg5UhafPybRTE970fmDKYnNRRuJpvz3xntNpargubjM3mYDqX6vBtRs37HeIWY5soX'} 

Ссылка: Класс azure.storage.blob.StorageStreamDownloader | Microsoft Learn

Другие вопросы по теме

Вопросы по созданию делегирования пользователей sas
На информационной панели Azure: предоставьте пользователю доступ к одному серверу приложений, базе данных SQL и хранилищу Azure
Terraform: ошибка: получение статических свойств веб-сайта для учетной записи хранения (подписка: ***: превышен крайний срок контекста)
Хранилище BLOB-объектов Azure: нет ли события UnDelete?
Учетная запись хранения с отключенным частным доступом. 403 при доступе к BLOB-объектам
Доступ к контейнеру Azure через Python SDK завершается сбоем из-за AuthorizationPermissionMismatch, но я являюсь владельцем
Потоковая передача (загрузка) файла из хранилища BLOB-объектов Azure пользователю через flask
Как скопировать все контейнеры исходной учетной записи хранения в целевую учетную запись хранения в виде папок в контейнере?
ADF CopyActivity рекурсивно распаковывает архив, но в этом нет необходимости
Есть ли способ использовать команду COPY INTO Databricks для копирования всех файлов из BLOB-объекта Azure в дельта-таблицу?