Каталог Pyiceberg в GCS: я не могу использовать pyceberg с облачным хранилищем Google

Я хочу использовать библиотеку pyiceberg с облачным хранилищем Google.

У меня есть каталог, созданный в хранилище Google Cloud с использованием Pyspark, и я хотел бы прочитать оттуда эти таблицы.

Я вижу эту документацию по созданию объекта каталога для GSC, но я действительно не понимаю, как к нему подключиться или как создать объект конфигурации для облака Google.

Я пытался:

catalog = load_catalog(
    uri = "gs://catalog",
    type = "gcsfs"
)

но я получаю ошибку:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[4], line 1
----> 1 catalog = load_catalog(
      2     name = "gcsfs", 
      
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/site-packages/pyiceberg/catalog/__init__.py:212, in load_catalog(name, **properties)
    210 catalog_type = None
    211 if provided_catalog_type and isinstance(provided_catalog_type, str):
--> 212     catalog_type = CatalogType[provided_catalog_type.upper()]
    213 elif not provided_catalog_type:
    214     catalog_type = infer_catalog_type(name, conf)

File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/enum.py:792, in EnumType.__getitem__(cls, name)
    788 def __getitem__(cls, name):
    789     """
    790     Return the member matching `name`.
    791     """
--> 792     return cls._member_map_[name]

KeyError: 'GCSFS'

Я установил пакет pypiceberg[gcsfs].

Я вижу в репозитории PYICEBERG на GitHub

AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
    CatalogType.REST: load_rest,
    CatalogType.HIVE: load_hive,
    CatalogType.GLUE: load_glue,
    CatalogType.DYNAMODB: load_dynamodb,
    CatalogType.SQL: load_sql,
}

Установлен ли gcsfs в вашей среде?

Oluwafemi Sule 06.05.2024 18:10

Да: gcsfs==2023.12.2.post1

J.C Guzman 06.05.2024 18:28

Где вы собираетесь писать каталог, то есть имена таблиц/метаданные? Таблицы можно записать в облачную корзину, но каталогами необходимо управлять либо в метахранилище Hive, либо на стороне сервера через Rest API, либо в базе данных.

Oluwafemi Sule 06.05.2024 19:11

Я совершенно новичок в этом. Какой из них мне следует использовать для хранения таблиц и их совместного чтения в облачном хранилище Google?

J.C Guzman 07.05.2024 01:09

Я написал ответ в развернутой форме как ответ на ваш вопрос.

Oluwafemi Sule 07.05.2024 21:48
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
5
156
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Pyiceberg — библиотека Python для работы с таблицами Iceberg.

Сначала получите токен OAuth2, используя файл учетной записи службы. Я запускаю это в совместной работе, поэтому мне нужно было сделать это таким образом. Вы можете сделать это по-другому, если работаете в контейнере.

import google.auth
from google.auth.transport.requests import Request
from pyiceberg import catalog


def get_access_token(service_account_file, scopes):
  """
  Retrieves an access token from Google Cloud Platform using service account credentials.

  Args:
      service_account_file: Path to the service account JSON key file.
      scopes: List of OAuth scopes required for your application.

  Returns:
      The access token as a string.
  """

  credentials, name = google.auth.load_credentials_from_file(
      service_account_file, scopes=scopes)

  request = Request()
  credentials.refresh(request)  # Forces token refresh if needed
  return credentials

# Example usage
service_account_file = "/path-to-service-account-file.json"  # Replace with your path
scopes = ["https://www.googleapis.com/auth/cloud-platform"]  # Adjust scopes as needed

access_token = get_access_token(service_account_file, scopes)

Далее загружается каталог. Мы используем учетные данные OAUTH2, полученные с помощью ключа нашей сервисной учетной записи.

Я отредактировал функцию datetime_to_unix_ms, чтобы сосредоточиться на основной задаче.

Поскольку вы только начинаете, я предлагаю упростить вашу реализацию, используя базу данных для вашего реестра.

Если у вас уже есть кластер EMR, вам следует вместо этого рассмотреть возможность использования метахранилища Hive.

В этом примере мы будем использовать базу данных sqlite для нашего центрального реестра. Вы можете заменить его любым из параметров базы данных SQL, поддерживаемых библиотекой SQLalchemy.

REGISTRY_DATABASE_URI = "sqlite:///catalog.db" # replace this with your database URI

catalog_inst = catalog.load_catalog(
    "default",
    **{
        "uri": REGISTRY_DATABASE_URI, 
        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
        "gcs.oauth2.token-expires-at": datetime_to_unix_ms(access_token.expiry),
        "gcs.project-id": "project-id", # replace with your gcp project id
        "gcs.oauth2.token": access_token.token,
        "gcs.default-bucket-location": "gs://bucket/", # replace with your gcs bucket
        "warehouse": "gs://bucket/" # replace with your gcs bucket
    }
)

Наконец, мы создаем пример таблицы с некоторыми данными, используя Pyarrow:

import pyarrow as pa

catalog_inst.create_namespace("default") # Replace this with your namespace

# Define the schema for the book table
schema = pa.schema([
    ('title', pa.string())
])

catalog_inst.drop_table("default.books") # Replace this with your table
table = catalog_inst.create_table("default.books", schema=schema)

# Create some sample data
titles = ["The Lord of the Rings", "Pride and Prejudice", "Moby Dick"]

# Create Arrow arrays from the data
title_array = pa.array(titles, type=pa.string())
table_data = pa.Table.from_arrays([title_array], names=schema.names)

table.append(table_data)

Он хранит метаданные в базе данных sqlite, но не может читать или записывать метаданные в корзину облачного хранилища Google? как это делает Pyspark при выборе каталога Hadoop?

J.C Guzman 07.05.2024 23:47

Я могу записывать файлы в корзину. У вас есть набор опций gcs.default-bucket-location, gcs.oauth2.token, gcs.oauth2.token-expires-at и warehouse.

Oluwafemi Sule 08.05.2024 09:01

Да, я понимаю, я имею в виду, что я не хочу использовать отдельную базу данных для хранения метаданных, я просто хочу хранить метаданные в корзине так же, как когда я использую pyspark с типом каталога Hadoop. Я хочу знать, возможно ли это с pyiceberg

J.C Guzman 08.05.2024 10:58

Это возможно, но не поддерживается. Вам потребуется реализовать собственный каталог для записи каталога в корзину gcs. Вы можете начать с создания подкласса класса MetastoreCatalog

Oluwafemi Sule 08.05.2024 13:40

Другие вопросы по теме