Google cloud dataflow с python

Попытка реализовать более простую форму примера это, который у меня есть, и ошибка при вставке данных в BigQuery

Это код

from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class DataIngestion:
    def parse_method(self, string_input):
        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(zip('Mensaje',values))
        return row



def run(argv=None):
    """The main function which creates the pipeline and runs it."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input', dest='input', required=False,
        help='Input file to read.  This can be a local file or '
             'a file in a Google Storage Bucket.',
        default='C:\XXXX\prueba.csv')

    parser.add_argument('--output', dest='output', required=False,
                        help='Output BQ table to write results to.',
                        default='PruebasIoT.TablaIoT')

    known_args, pipeline_args = parser.parse_known_args(argv)

    data_ingestion = DataIngestion()

    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (p
     | 'Read from a File' >> beam.io.ReadFromText(known_args.input,
                                                  skip_header_lines=1)

     | 'String To BigQuery Row' >> beam.Map(lambda s:
                                            data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
                beam.io.BigQuerySink
                    (
                    known_args.output,
                    schema='Mensaje:STRING'
                 )
            )
     )
    p.run().wait_until_finish()


if __name__ == '__main__':
    #  logging.getLogger().setLevel(logging.INFO)
    run()

И это ошибка:

RuntimeError: Could not successfully insert rows to BigQuery table [XXX]. Errors: [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u'm'
 message: u'no such field.'
 reason: u'invalid'>]
 index: 0>, <InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u'm'
 message: u'no such field.'
 reason: u'invalid'>]
 index: 1>]

Я новичок в python, и, возможно, решения довольно простые, но как я могу это сделать?

Можно было бы передать одну строку в Строка в строку BigQuery вместо

'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))

Это был бы более простой способ начать лучше, чем использовать файлы csv и переводить файл

0
0
155
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Чтобы решить проблему с использованием файла CSV с одним значением в строке, я должен использовать это:

    values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
    row = dict(zip(('Name',),values))

Я не знаю, почему я должен ставить "," после "Имя", но если я этого не сделаю, dict (zip (... не работает должным образом

Ответ принят как подходящий

Насколько я понимаю, у вас есть входной CSV-файл с одним столбцом вида:

Message
This is a message
This is another message
I am writing to BQ

Если я правильно понял, вам не нужен метод parse_method(), потому что, как объяснено в образец, которым вы поделились, это просто вспомогательный метод, который сопоставляет значения CSV со словарями (которые принимаются beam.io.BigQuerySink).

Затем вы можете просто сделать что-то вроде:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
 | 'String To BigQuery Row' >> beam.Map(lambda s: dict(Message = s))
 | 'Write to BigQuery' >> beam.io.Write(
    beam.io.BigQuerySink(known_args.output, schema='Message:STRING')))

p.run().wait_until_finish()

Обратите внимание, что единственное существенное отличие состоит в том, что сопоставление «Строка в строку BigQuery» больше не требует сложного метода, и все, что оно делает, - это создание словаря Python, такого как {Message: "This is a message"}, где Message - это имя столбца в вашей таблице BQ. В этом отображении s - это каждый из элементов String, считываемых в преобразовании beam.io.ReadFromText, и мы применяем лямбда-функция.

Другие вопросы по теме