Ошибка Pyarrow: пытался прочитать X байтов, начиная с позиции X из файла, но получил только X

Я читаю (используя polars/pyarrow) каталог, полный паркетов из хранилища BLOB-объектов Azure, который составляет 60 ГБ на вычислительном экземпляре Standard_E8_v3 (8 ядер, 64 ГБ ОЗУ, 200 ГБ на диске).

После того, как я прочитал данные, я хочу сгруппировать данные и собрать результат, однако при сборе результата я получаю эту ошибку

Я не совсем понимаю, что он мне говорит -

Говорит ли это, что он не может обработать мои данные, потому что они слишком велики для моей машины?

Ошибка в коде?

Есть ли ошибка с данными, которые мне нужно обработать?

Если бы кто-нибудь мог осветить проблему, которая была бы очень признательна - важно, чтобы любое решение было основано на Polars :)

Код ниже:

import pyarrow.dataset as ds
from azureml.fsspec import AzureMachineLearningFileSystem
import polars as pl
from azureml.core import Workspace

ws = Workspace.from_config()

# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
workspace = ws.name
datastore_name = 'datastore_name'
path_on_datastore = 'path_to_data'

# long-form Datastore uri format:
uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}'

aml_fs = AzureMachineLearningFileSystem(uri)
files = aml_fs.glob()

myds=ds.dataset(path_on_datastore, filesystem=aml_fs, format = "parquet")
df = (
    pl.scan_pyarrow_dataset(myds)
    .select([
        'COLUMN_LIST'
    ])
    #.with_columns(pl.col('turnovervalue').cast(pl.Float64, strict=False))
    .filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)

grouped = (df.lazy()
    .groupby(['colA','colB'])
    .agg(
        [
            pl.n_unique('colC').alias('Blah'),
            pl.sum('colD').alias("BlahBlah"),
            pl.n_unique('colE').alias('BlahBlahBlah'),
            (pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
        ]
    )
).collect()

Обновлено:

Проверил схему моего фрейма данных Polars, и он выводит разумный результат, поэтому я предполагаю, что мое подключение к Azure правильное, поэтому я подумал, что пойду вверх по течению и проверю, работает ли то, что Polars читает из Pyarrow, но похоже, что это проблема Pyarrow, а чем у Поларса. Фрагмент ниже от меня, я просто проверяю заголовок набора данных Pyarrow, который я получил от Azure.

Я бы предположил, что тип данных, который он вывел, не является типом данных, который он получает при чтении, однако я не уверен, что такое данные в позиции 4 (во всей таблице), и я не уверен, как я иду разобраться???

Собираюсь изменить некоторые теги и заголовок, чтобы, надеюсь, связать новую проблему с нужными людьми, которые могут помочь.

df.fetch() работает?

Dean MacGregor 20.04.2023 18:53

Нет, к сожалению, выдает ту же ошибку :(

Hillygoose 21.04.2023 09:20

Как насчет myds.head(5)

Dean MacGregor 21.04.2023 10:35

@DeanMacGregor буквально только что отредактировал вопрос, чтобы выделить это

Hillygoose 21.04.2023 10:47
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
4
71
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Итак, мой код работает с изменением способа доступа к данным в Azure, поэтому я предполагаю, что это была общая проблема.

Вместо использования AzureMachineLearningFileSystem я обратился к adlfs.AzureBlobFileSystem.

Для доступа ко всем правильным учетным данным и т. д. требуется немного больше кода, но он не слишком подробный — и в конечном итоге он работает :)

import pyarrow.dataset as ds
import polars as pl
import adlfs
from azureml.core import Workspace,Datastore
from azure.mgmt.storage import StorageManagementClient
from azure.identity import DefaultAzureCredential

# Acquire a credential object
credential = DefaultAzureCredential()
# Get Workspace
ws = Workspace.from_config()
# Get specific datastore
datastore = Datastore.get(ws,'datastore_name')

# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
datastore_name = datastore.account_name
container_name = datastore.container_name
path_on_datastore = f'{container_name}/path/to/data'

# Provision the storage account, starting with a management object.
storage_client = StorageManagementClient(credential, subscription)

# Retrieve the account's primary access key
keys = storage_client.storage_accounts.list_keys(resource_group, datastore_name)
key_to_access = keys.keys[0].value

# ... load your credentials and configure the filesystem
fs = adlfs.AzureBlobFileSystem(account_name=datastore_name, account_key=key_to_access)

dd = ds.dataset(path_on_datastore, filesystem=fs)

df = (
    pl.scan_pyarrow_dataset(dd)
    .select([
        'COLUMN_LIST'
    ])
    .filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)

grouped = (df.lazy()
    .groupby(['colA','colB'])
    .agg(
        [
            pl.n_unique('colC').alias('Blah'),
            pl.sum('colD').alias("BlahBlah"),
            pl.n_unique('colE').alias('BlahBlahBlah'),
            (pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
        ]
    )
).collect()

Ссылки на помощь другим:

Подключение Pyarrow к Azure Blob — https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-совместимые-файловыесистемы-с-стрелкой

документы adlfs — https://github.com/fsspec/adlfs

Программное получение строки подключения к большому двоичному объекту — Как программно получить строку подключения из учетной записи хранения Azure в Python

На данный момент приму мой собственный ответ, но если есть лучший способ сделать это, пожалуйста, не стесняйтесь публиковать, и я изменю принятие, если это так.

Другие вопросы по теме

Ошибка при приеме данных из лазурного добавления большого двоичного объекта в базу данных kusto с использованием фабрики данных azure
Оперативное резервное копирование BLOB-объектов Azure
PowerApps: доступ к учетной записи хранения Azure доступен только из выбранных диапазонов виртуальных сетей и IP-адресов
Почему generate_blob_sas() дает сбой, но generate_container_sas() работает нормально с теми же значениями?
Вставка значения, хранящегося в переменной конвейера, в качестве новой строки данных набора данных приемника
Установка времени жизни в контейнерах BLOB-объектов Azure, созданных с помощью BlobServiceClient
Хранилище BLOB-объектов Azure — удаление разрешений для папки
Что заменяет старый BlockBlobService.get_blob_to_bytes в новом BlobServiceClient?
Что такое blob_name для Azure_blob_sas() Python SDK, когда задействованы пути к папкам?
Как сбросить максимальный возраст CORS в хранилище BLOB-объектов Azure