Подключение к Delta Lake, размещенному на MinIO, от Dask

Я пытаюсь подключиться к таблице DeltaLake, которая хранится на MinIO, а не на S3. Я могу сделать это непосредственно с помощью пакета deltalake Python следующим образом:

storage_options = {
    "AWS_ENDPOINT_URL": "http://localhost:9000",
    "AWS_REGION": "local",
    "AWS_ACCESS_KEY_ID": access_key,
    "AWS_SECRET_ACCESS_KEY": secret_key,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    "AWS_ALLOW_HTTP": "true"
}

dt = DeltaTable("s3a://my_bucket/data", storage_options=storage_options)
df = dt.to_pandas()

Однако вместо этого я хочу прочитать фрейм данных Dask, поэтому пытаюсь использовать dask-deltatable. Поскольку под капотом используется deltalake, я предположил, что будет работать следующее:

ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)

Однако, похоже, он все еще пытается подключиться к AWS:

OSError                                   Traceback (most recent call last)
Cell In[3], line 1
----> 1 ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)

File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:285, in read_deltalake(path, catalog, database_name, table_name, version, columns, storage_options, datetime, delta_storage_options, **kwargs)
    282         raise ValueError("Please Provide Delta Table path")
    284     delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options)  # type: ignore
--> 285     resultdf = _read_from_filesystem(
    286         path=path,
    287         version=version,
    288         columns=columns,
    289         storage_options=storage_options,
    290         datetime=datetime,
    291         delta_storage_options=delta_storage_options,
    292         **kwargs,
    293     )
    294 return resultdf

File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:102, in _read_from_filesystem(path, version, columns, datetime, storage_options, delta_storage_options, **kwargs)
     99 delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options)  # type: ignore
    101 fs, fs_token, _ = get_fs_token_paths(path, storage_options=storage_options)
--> 102 dt = DeltaTable(
    103     table_uri=path, version=version, storage_options=delta_storage_options
    104 )
    105 if datetime is not None:
    106     dt.load_as_version(datetime)

File ~/.local/lib/python3.10/site-packages/deltalake/table.py:297, in DeltaTable.__init__(self, table_uri, version, storage_options, without_files, log_buffer_size)
    277 """
    278 Create the Delta Table from a path with an optional version.
    279 Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
   (...)
    294 
    295 """
    296 self._storage_options = storage_options
--> 297 self._table = RawDeltaTable(
    298     str(table_uri),
    299     version=version,
    300     storage_options=storage_options,
    301     without_files=without_files,
    302     log_buffer_size=log_buffer_size,
    303 )

OSError: Generic S3 error: Error after 10 retries in 13.6945151s, max_retries:10, retry_timeout:180s, source:error sending request for url (http://169.254.169.254/latest/api/token)

Кому-нибудь удалось успешно прочитать данные из Deltalake в фрейм данных Dask из MinIO, и если да, то как?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
50
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Во-первых: разумным обходным решением было бы установить эти значения как переменные среды. Таким образом, они должны быть подхвачены любыми работающими платформами s3.

В строке документации у нас есть

Storage_options: dict, по умолчанию Нет Пары ключ/значение, которые должны быть переданы в серверную часть fsspec, если таковые имеются.

delta_storage_options: dict, по умолчанию Нет Пары ключ/значение должны быть переданы в файловую систему delta-rs, если таковые имеются.

Я подозреваю, что здесь второй набор опций именно такой, как у вас.

Для fsspec см. эту документацию, в которой показано, как установить конечную точку, регион и ключ/секрет. Остальные значения будут частью client_kwargs , и вам нужно будет поискать это в документации botocore, но я подозреваю, что они не нужны.

Спасибо, мне помогло задание тех же параметров в качестве переменных среды, а не передача их в storage_options. ``` для k,v в Storage_options.items(): os.environ[k] = v ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data") ```

James Baker 05.07.2024 08:24

Другие вопросы по теме