Экспорт таблицы BigQuery

Я ищу лучший шаблон, чтобы иметь возможность выполнять и экспортировать результат запроса BigQuery в корзину облачного хранилища. Я хотел бы, чтобы это выполнялось, когда таблица BigQuery записывается или изменяется.

Я думаю, что я бы традиционно настроил тему pubsub, в которую будет записываться при изменении таблицы, что вызовет функцию GCP, отвечающую за выполнение запроса и запись результата в корзину GCP. Я просто не слишком уверен, что нет лучшего подхода (более прямолинейного), чтобы сделать это в GCP.

Заранее спасибо.

Создание приборной панели для анализа данных на GCP - часть I
Создание приборной панели для анализа данных на GCP - часть I
Недавно я столкнулся с интересной бизнес-задачей - визуализацией сбоев в цепочке поставок лекарств, которую могут просматривать врачи и...
0
0
79
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я предлагаю вам подход, основанный на Eventarc.

Цель состоит в том, чтобы запустить действие Cloud Function или Cloud Run, когда данные вставляются или обновляются в BigQuery таблице, пример с Cloud Run:

SERVICE=bq-cloud-run
PROJECT=$(gcloud config get-value project)
CONTAINER = "gcr.io/${PROJECT}/${SERVICE}"
gcloud builds submit --tag ${CONTAINER}
gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed

gcloud eventarc triggers create ${SERVICE}-trigger \
  --location ${REGION} --service-account ${SVC_ACCOUNT} \
  --destination-run-service ${SERVICE}  \
  --event-filters type=google.cloud.audit.log.v1.written \
  --event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
  --event-filters serviceName=bigquery.googleapis.com

Когда задание BigQuery будет выполнено, будет запущено действие Cloud Run.

Пример действия Cloud Run:

@app.route('/', methods=['POST'])
def index():
    # Gets the Payload data from the Audit Log
    content = request.json
    try:
        ds = content['resource']['labels']['dataset_id']
        proj = content['resource']['labels']['project_id']
        tbl = content['protoPayload']['resourceName']
        rows = int(content['protoPayload']['metadata']
                   ['tableDataChange']['insertedRowsCount'])
        if ds == 'cloud_run_tmp' and \
           tbl.endswith('tables/cloud_run_trigger') and rows > 0:
            query = create_agg()
            return "table created", 200
    except:
        # if these fields are not in the JSON, ignore
        pass
    return "ok", 200

Вы можете применить логику на основе текущего набора данных, таблицы или других элементов, существующих в текущей полезной нагрузке.

Другие вопросы по теме