Я читаю (используя polars/pyarrow) каталог, полный паркетов из хранилища BLOB-объектов Azure, который составляет 60 ГБ на вычислительном экземпляре Standard_E8_v3 (8 ядер, 64 ГБ ОЗУ, 200 ГБ на диске).
После того, как я прочитал данные, я хочу сгруппировать данные и собрать результат, однако при сборе результата я получаю эту ошибку
Я не совсем понимаю, что он мне говорит -
Говорит ли это, что он не может обработать мои данные, потому что они слишком велики для моей машины?
Ошибка в коде?
Есть ли ошибка с данными, которые мне нужно обработать?
Если бы кто-нибудь мог осветить проблему, которая была бы очень признательна - важно, чтобы любое решение было основано на Polars :)
Код ниже:
import pyarrow.dataset as ds
from azureml.fsspec import AzureMachineLearningFileSystem
import polars as pl
from azureml.core import Workspace
ws = Workspace.from_config()
# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
workspace = ws.name
datastore_name = 'datastore_name'
path_on_datastore = 'path_to_data'
# long-form Datastore uri format:
uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}'
aml_fs = AzureMachineLearningFileSystem(uri)
files = aml_fs.glob()
myds=ds.dataset(path_on_datastore, filesystem=aml_fs, format = "parquet")
df = (
pl.scan_pyarrow_dataset(myds)
.select([
'COLUMN_LIST'
])
#.with_columns(pl.col('turnovervalue').cast(pl.Float64, strict=False))
.filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)
grouped = (df.lazy()
.groupby(['colA','colB'])
.agg(
[
pl.n_unique('colC').alias('Blah'),
pl.sum('colD').alias("BlahBlah"),
pl.n_unique('colE').alias('BlahBlahBlah'),
(pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
]
)
).collect()
Обновлено:
Проверил схему моего фрейма данных Polars, и он выводит разумный результат, поэтому я предполагаю, что мое подключение к Azure правильное, поэтому я подумал, что пойду вверх по течению и проверю, работает ли то, что Polars читает из Pyarrow, но похоже, что это проблема Pyarrow, а чем у Поларса. Фрагмент ниже от меня, я просто проверяю заголовок набора данных Pyarrow, который я получил от Azure.
Я бы предположил, что тип данных, который он вывел, не является типом данных, который он получает при чтении, однако я не уверен, что такое данные в позиции 4 (во всей таблице), и я не уверен, как я иду разобраться???
Собираюсь изменить некоторые теги и заголовок, чтобы, надеюсь, связать новую проблему с нужными людьми, которые могут помочь.
Нет, к сожалению, выдает ту же ошибку :(
Как насчет myds.head(5)
@DeanMacGregor буквально только что отредактировал вопрос, чтобы выделить это
Итак, мой код работает с изменением способа доступа к данным в Azure, поэтому я предполагаю, что это была общая проблема.
Вместо использования AzureMachineLearningFileSystem
я обратился к adlfs.AzureBlobFileSystem
.
Для доступа ко всем правильным учетным данным и т. д. требуется немного больше кода, но он не слишком подробный — и в конечном итоге он работает :)
import pyarrow.dataset as ds
import polars as pl
import adlfs
from azureml.core import Workspace,Datastore
from azure.mgmt.storage import StorageManagementClient
from azure.identity import DefaultAzureCredential
# Acquire a credential object
credential = DefaultAzureCredential()
# Get Workspace
ws = Workspace.from_config()
# Get specific datastore
datastore = Datastore.get(ws,'datastore_name')
# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
datastore_name = datastore.account_name
container_name = datastore.container_name
path_on_datastore = f'{container_name}/path/to/data'
# Provision the storage account, starting with a management object.
storage_client = StorageManagementClient(credential, subscription)
# Retrieve the account's primary access key
keys = storage_client.storage_accounts.list_keys(resource_group, datastore_name)
key_to_access = keys.keys[0].value
# ... load your credentials and configure the filesystem
fs = adlfs.AzureBlobFileSystem(account_name=datastore_name, account_key=key_to_access)
dd = ds.dataset(path_on_datastore, filesystem=fs)
df = (
pl.scan_pyarrow_dataset(dd)
.select([
'COLUMN_LIST'
])
.filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)
grouped = (df.lazy()
.groupby(['colA','colB'])
.agg(
[
pl.n_unique('colC').alias('Blah'),
pl.sum('colD').alias("BlahBlah"),
pl.n_unique('colE').alias('BlahBlahBlah'),
(pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
]
)
).collect()
Ссылки на помощь другим:
Подключение Pyarrow к Azure Blob — https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-совместимые-файловыесистемы-с-стрелкой
документы adlfs — https://github.com/fsspec/adlfs
Программное получение строки подключения к большому двоичному объекту — Как программно получить строку подключения из учетной записи хранения Azure в Python
На данный момент приму мой собственный ответ, но если есть лучший способ сделать это, пожалуйста, не стесняйтесь публиковать, и я изменю принятие, если это так.
df.fetch()
работает?