Google-cloud-dataflow: не удалось вставить данные json в bigquery через writetobigquery / bigquerysink с помощью bigquerydisposition.write_truncate.

Учитывая набор данных, как показано ниже

{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}
...more type json data here

Я пытаюсь отфильтровать эти данные json и вставить их в bigquery с помощью python sdk следующим образом

ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'

class ParseJsonDoFn(beam.DoFn):
    B_TYPE = 'tag_B'
    def process(self, element):
        text_line = element.trip()
        data = json.loads(text_line)

        if data['type'] == 'ba':
            ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
            yield pvalue.TaggedOutput(self.B_TYPE, ba)

def run():
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                      dest='input',
                      default='data/path/data',
                      help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
      '--runner=DirectRunner',
      '--project=project-id',
      '--job_name=data-job',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(known_args.input)

        multiple_lines = (
            lines
            | 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
                                      ParseJsonDoFn.B_TYPE)))

        b_line = multiple_lines.tag_B
        (b_line
            | "output_b" >> beam.io.WriteToBigQuery(
                                          'temp.ba',
                                          schema = B_schema,
                                          write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        ))

И журналы отладки показывают

INFO:root:finish <DoOperation output_b/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_b/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:root:Successfully wrote 2 rows.

Кажется, эти два данных с type:ba были вставлены в таблицу bigquery temp.ba. Однако я бегу

select * from `temp.ba`  limit 100;

Нет данных в этой таблице temp.ba.

Что-то не так с моими кодами или что-то мне не хватает?


Обновлять:

Спасибо, ответ @Eric Schmidt, я знаю, что для исходных данных может быть некоторое отставание. Однако через 5 минут после запуска вышеуказанного сценария в таблице все еще есть нет данных.

enter image description here

Когда я пытаюсь удалить write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE в BigQuerySink

    | "output_b" >> beam.io.Write(
                      beam.io.BigQuerySink(
                          table = 'ba',
                          dataset = 'temp',
                          project = 'project-id',
                          schema = ba_schema,
                          #write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                          create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )
                    ))

Эти две записи можно было найти немедленно.

И информация в таблице

enter image description here

Может я еще не уловил смысл задержка доступности исходных данных. Может кто-нибудь дать мне дополнительную информацию?

2
0
487
1

Ответы 1

Следует учитывать две вещи:

1) Прямой (локальный) раннер использует потоковые вставки. Имеется начальная задержка доступности данных посмотреть этот пост.

2) Убедитесь, что вы полностью соответствуете требованиям для проекта, в который транслируете. С BigQuerySink () project = "foo", dataset = "bar", table = "biz".

Я подозреваю, что ваша проблема №1.

Когда я комментирую эту строку `# write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,` с BigQuerySink, данные могут быть успешно вставлены в bigquery.

zangw 11.09.2018 07:39

Также я столкнулся с другой проблемой конвейера, конвейер запускается дважды в моем коде? на мой вопрос ясно, еще один вопрос stackoverflow.com/questions/52270674/… - это сообщение

zangw 11.09.2018 09:24

Когда вы удаляете опцию усечения, таблица НЕ удаляется, поэтому данные отображаются немедленно. При записи в имена удаленных таблиц существует задержка в 150 секунд. Я посмотрю на другую вашу ветку сейчас.

Eric Schmidt 11.09.2018 18:11

Спасибо за вашего пациента, я снова обновил свой вопрос.

zangw 12.09.2018 06:37

Другие вопросы по теме