Подписка на облачное хранилище Google Pubsub для объединения сообщений в один файл avro

  • У меня есть тема Google Pubsub без принудительного применения схемы (и я не хотел бы применять принудительное применение схемы)
  • У меня есть подписка на облачное хранилище Google Pubsub, позволяющая:
    • сбрасывать сообщения в GCS (Google Cloud Storage) в виде файлов Avro.
    • имя файла/шаблон пути
    • некоторые ограничения на размер файла или требования ко времени, как я думаю, объединение нескольких сообщений в один файл Avro

Проблема в том, что независимо от того, как я устанавливаю эту подписку на облачное хранилище, я продолжаю видеть 1 файл Avro для каждого сообщения, опубликованного в теме Pubsub. Я хотел бы как-то это изменить, объединить сообщения в один файл Avro или, возможно, продолжать добавлять их в один и тот же файл Avro в зависимости от ограничений времени/пространства.

# cloud storage subscription
gcloud pubsub subscriptions create projects/my-project/subscriptions/my-subscription \
  --topic=projects/my-project/topics/my-topic \
  --cloud-storage-bucket=my-bucket \
  --cloud-storage-file-prefix=my-prefix/ \
  --cloud-storage-file-suffix=_my-suffix.avro \
  --cloud-storage-max-bytes=2GB \
  --cloud-storage-max-duration=1m \
  --cloud-storage-output-format=avro \
  --cloud-storage-write-metadata \
  --dead-letter-topic=projects/my-project/topics/my-dlt \
  --max-delivery-attempts=5 \
  --project=my-project

Затем я отправляю 2 сообщения в тему Pubsub.

Затем жду 1 минуту (см. --cloud-storage-max-duration=1m выше).

Затем я проверяю содержимое сегмента GCS:

$ gsutil ls gs://my-bucket/my-prefix/
gs://my-bucket/my-prefix/2024-06-14T14:22:45+00:00_6db102_my-suffix.avro
gs://my-bucket/my-prefix/2024-06-14T14:22:46+00:00_0bc008_my-suffix.avro

Но здесь я пытаюсь объединить сообщения Pubsub в 1 файл avro. Я ожидал, что --cloud-storage-max-bytes=2GB и --cloud-storage-max-duration=1m сделают это, но это не так.

Я также пытался удалить чувствительность имен файлов к дате и времени с помощью --cloud-storage-file-datetime-format=YYYY-MM-DD и --cloud-storage-file-datetime-format=YYYY-MM-DD_hh, но когда я пытаюсь создать подписку на облачное хранилище, это не удается, поскольку для этого требуется полный формат определения даты и времени.

Документы, описывающие вышеизложенное, см. здесь: https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#file_names

Я хотел бы иметь структуру корзины GCS, например:

gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_1>_my-suffix.avro
gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_2>_my-suffix.avro
...
gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_N>_my-suffix.avro

Или, возможно, иметь ./YYYY-DD-MM/ в качестве подпапки.

В идеале я бы хотел избегать Dataflow или Dataproc (Spark). Похоже, подписка на облачное хранилище может помочь сбросить данные в GCS.

Это вообще возможно? Как я могу это сделать?

Создание приборной панели для анализа данных на GCP - часть I
Создание приборной панели для анализа данных на GCP - часть I
Недавно я столкнулся с интересной бизнес-задачей - визуализацией сбоев в цепочке поставок лекарств, которую могут просматривать врачи и...
0
0
69
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

К сожалению, из-за того, как масштабирование Pub/Sub работает на бэкэндах, ожидается, что ваши сообщения будут распределяться по нескольким файлам. Это обсуждается в документации по устранению неполадок.

ваша подписка на облачное хранилище может обрабатываться несколькими серверами Pub/Sub. Каждый серверный модуль записывает данные в отдельный файл облачного хранилища, поэтому вы можете наблюдать, как ваше облачное хранилище создает больше файлов, чем ожидалось, особенно для рабочих нагрузок с низкой пропускной способностью.

Если вам нужна подписка на облачное хранилище для размещения сообщений в меньшем количестве одновременных файлов, вы можете подать запрос на функцию с описанием вашего варианта использования, и он будет рассмотрен.

Между тем, если вам нужно, чтобы ваши сообщения помещались в один и тот же файл, вам может потребоваться настроить отдельный процесс для объединения файлов после их записи. Один из способов сделать это — запустить периодический процесс, который компонует объекты Cloud Storage.

gcloud storage objects compose \
  gs://BUCKET_NAME/SOURCE_OBJECT_1 \
  gs://BUCKET_NAME/SOURCE_OBJECT_2 \
  gs://BUCKET_NAME/COMPOSITE_OBJECT_NAME

спасибо за контекст документации! эта композиция объектов работает и с файлами avro? Я не могу найти подробности в этих документах, боюсь, мне придется написать какой-то собственный скрипт, чтобы добиться этого при использовании более сложных типов файлов, чем только добавление ndjson

TPPZ 17.06.2024 10:15

Приносим извинения за задержку ответа; к сожалению, я не верю, что композиция объектов будет работать с файлами avro, потому что вам нужно будет написать объявление схемы для каждого составленного файла. Также могут возникнуть проблемы с файлами Avro в двоичном кодировании. Вероятно, вам потребуется написать собственный сценарий для объединения файлов Avro.

Lauren 08.07.2024 22:44

Другие вопросы по теме