Я пытаюсь подключиться к таблице DeltaLake, которая хранится на MinIO, а не на S3. Я могу сделать это непосредственно с помощью пакета deltalake
Python следующим образом:
storage_options = {
"AWS_ENDPOINT_URL": "http://localhost:9000",
"AWS_REGION": "local",
"AWS_ACCESS_KEY_ID": access_key,
"AWS_SECRET_ACCESS_KEY": secret_key,
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
"AWS_ALLOW_HTTP": "true"
}
dt = DeltaTable("s3a://my_bucket/data", storage_options=storage_options)
df = dt.to_pandas()
Однако вместо этого я хочу прочитать фрейм данных Dask, поэтому пытаюсь использовать dask-deltatable
. Поскольку под капотом используется deltalake
, я предположил, что будет работать следующее:
ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)
Однако, похоже, он все еще пытается подключиться к AWS:
OSError Traceback (most recent call last)
Cell In[3], line 1
----> 1 ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data", storage_options=storage_options)
File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:285, in read_deltalake(path, catalog, database_name, table_name, version, columns, storage_options, datetime, delta_storage_options, **kwargs)
282 raise ValueError("Please Provide Delta Table path")
284 delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options) # type: ignore
--> 285 resultdf = _read_from_filesystem(
286 path=path,
287 version=version,
288 columns=columns,
289 storage_options=storage_options,
290 datetime=datetime,
291 delta_storage_options=delta_storage_options,
292 **kwargs,
293 )
294 return resultdf
File ~/.local/lib/python3.10/site-packages/dask_deltatable/core.py:102, in _read_from_filesystem(path, version, columns, datetime, storage_options, delta_storage_options, **kwargs)
99 delta_storage_options = utils.maybe_set_aws_credentials(path, delta_storage_options) # type: ignore
101 fs, fs_token, _ = get_fs_token_paths(path, storage_options=storage_options)
--> 102 dt = DeltaTable(
103 table_uri=path, version=version, storage_options=delta_storage_options
104 )
105 if datetime is not None:
106 dt.load_as_version(datetime)
File ~/.local/lib/python3.10/site-packages/deltalake/table.py:297, in DeltaTable.__init__(self, table_uri, version, storage_options, without_files, log_buffer_size)
277 """
278 Create the Delta Table from a path with an optional version.
279 Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI.
(...)
294
295 """
296 self._storage_options = storage_options
--> 297 self._table = RawDeltaTable(
298 str(table_uri),
299 version=version,
300 storage_options=storage_options,
301 without_files=without_files,
302 log_buffer_size=log_buffer_size,
303 )
OSError: Generic S3 error: Error after 10 retries in 13.6945151s, max_retries:10, retry_timeout:180s, source:error sending request for url (http://169.254.169.254/latest/api/token)
Кому-нибудь удалось успешно прочитать данные из Deltalake в фрейм данных Dask из MinIO, и если да, то как?
Во-первых: разумным обходным решением было бы установить эти значения как переменные среды. Таким образом, они должны быть подхвачены любыми работающими платформами s3.
В строке документации у нас есть
Storage_options: dict, по умолчанию Нет Пары ключ/значение, которые должны быть переданы в серверную часть fsspec, если таковые имеются.
delta_storage_options: dict, по умолчанию Нет Пары ключ/значение должны быть переданы в файловую систему delta-rs, если таковые имеются.
Я подозреваю, что здесь второй набор опций именно такой, как у вас.
Для fsspec
см. эту документацию, в которой показано, как установить конечную точку, регион и ключ/секрет. Остальные значения будут частью client_kwargs , и вам нужно будет поискать это в документации botocore, но я подозреваю, что они не нужны.
Спасибо, мне помогло задание тех же параметров в качестве переменных среды, а не передача их в
storage_options
. ``` для k,v в Storage_options.items(): os.environ[k] = v ddf = dask_deltatable.read_deltalake("s3a://my_bucket/data") ```