Я хочу использовать библиотеку pyiceberg с облачным хранилищем Google.
У меня есть каталог, созданный в хранилище Google Cloud с использованием Pyspark, и я хотел бы прочитать оттуда эти таблицы.
Я вижу эту документацию по созданию объекта каталога для GSC, но я действительно не понимаю, как к нему подключиться или как создать объект конфигурации для облака Google.
Я пытался:
catalog = load_catalog(
uri = "gs://catalog",
type = "gcsfs"
)
но я получаю ошибку:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[4], line 1
----> 1 catalog = load_catalog(
2 name = "gcsfs",
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/site-packages/pyiceberg/catalog/__init__.py:212, in load_catalog(name, **properties)
210 catalog_type = None
211 if provided_catalog_type and isinstance(provided_catalog_type, str):
--> 212 catalog_type = CatalogType[provided_catalog_type.upper()]
213 elif not provided_catalog_type:
214 catalog_type = infer_catalog_type(name, conf)
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/enum.py:792, in EnumType.__getitem__(cls, name)
788 def __getitem__(cls, name):
789 """
790 Return the member matching `name`.
791 """
--> 792 return cls._member_map_[name]
KeyError: 'GCSFS'
Я установил пакет pypiceberg[gcsfs].
Я вижу в репозитории PYICEBERG на GitHub
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
}
Да: gcsfs==2023.12.2.post1
Где вы собираетесь писать каталог, то есть имена таблиц/метаданные? Таблицы можно записать в облачную корзину, но каталогами необходимо управлять либо в метахранилище Hive, либо на стороне сервера через Rest API, либо в базе данных.
Я совершенно новичок в этом. Какой из них мне следует использовать для хранения таблиц и их совместного чтения в облачном хранилище Google?
Я написал ответ в развернутой форме как ответ на ваш вопрос.
Pyiceberg — библиотека Python для работы с таблицами Iceberg.
Сначала получите токен OAuth2, используя файл учетной записи службы. Я запускаю это в совместной работе, поэтому мне нужно было сделать это таким образом. Вы можете сделать это по-другому, если работаете в контейнере.
import google.auth
from google.auth.transport.requests import Request
from pyiceberg import catalog
def get_access_token(service_account_file, scopes):
"""
Retrieves an access token from Google Cloud Platform using service account credentials.
Args:
service_account_file: Path to the service account JSON key file.
scopes: List of OAuth scopes required for your application.
Returns:
The access token as a string.
"""
credentials, name = google.auth.load_credentials_from_file(
service_account_file, scopes=scopes)
request = Request()
credentials.refresh(request) # Forces token refresh if needed
return credentials
# Example usage
service_account_file = "/path-to-service-account-file.json" # Replace with your path
scopes = ["https://www.googleapis.com/auth/cloud-platform"] # Adjust scopes as needed
access_token = get_access_token(service_account_file, scopes)
Далее загружается каталог. Мы используем учетные данные OAUTH2, полученные с помощью ключа нашей сервисной учетной записи.
Я отредактировал функцию datetime_to_unix_ms
, чтобы сосредоточиться на основной задаче.
Поскольку вы только начинаете, я предлагаю упростить вашу реализацию, используя базу данных для вашего реестра.
Если у вас уже есть кластер EMR, вам следует вместо этого рассмотреть возможность использования метахранилища Hive.
В этом примере мы будем использовать базу данных sqlite для нашего центрального реестра. Вы можете заменить его любым из параметров базы данных SQL, поддерживаемых библиотекой SQLalchemy.
REGISTRY_DATABASE_URI = "sqlite:///catalog.db" # replace this with your database URI
catalog_inst = catalog.load_catalog(
"default",
**{
"uri": REGISTRY_DATABASE_URI,
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"gcs.oauth2.token-expires-at": datetime_to_unix_ms(access_token.expiry),
"gcs.project-id": "project-id", # replace with your gcp project id
"gcs.oauth2.token": access_token.token,
"gcs.default-bucket-location": "gs://bucket/", # replace with your gcs bucket
"warehouse": "gs://bucket/" # replace with your gcs bucket
}
)
Наконец, мы создаем пример таблицы с некоторыми данными, используя Pyarrow:
import pyarrow as pa
catalog_inst.create_namespace("default") # Replace this with your namespace
# Define the schema for the book table
schema = pa.schema([
('title', pa.string())
])
catalog_inst.drop_table("default.books") # Replace this with your table
table = catalog_inst.create_table("default.books", schema=schema)
# Create some sample data
titles = ["The Lord of the Rings", "Pride and Prejudice", "Moby Dick"]
# Create Arrow arrays from the data
title_array = pa.array(titles, type=pa.string())
table_data = pa.Table.from_arrays([title_array], names=schema.names)
table.append(table_data)
Он хранит метаданные в базе данных sqlite, но не может читать или записывать метаданные в корзину облачного хранилища Google? как это делает Pyspark при выборе каталога Hadoop?
Я могу записывать файлы в корзину. У вас есть набор опций gcs.default-bucket-location
, gcs.oauth2.token
, gcs.oauth2.token-expires-at
и warehouse
.
Да, я понимаю, я имею в виду, что я не хочу использовать отдельную базу данных для хранения метаданных, я просто хочу хранить метаданные в корзине так же, как когда я использую pyspark с типом каталога Hadoop. Я хочу знать, возможно ли это с pyiceberg
Это возможно, но не поддерживается. Вам потребуется реализовать собственный каталог для записи каталога в корзину gcs. Вы можете начать с создания подкласса класса MetastoreCatalog
Установлен ли gcsfs в вашей среде?