Я столкнулся с проблемой в одном из моих скриптов Apache Beam. На данный момент это все еще черновик, и я использую его на DirectRunner. Но я продолжаю получать ошибку, которая не имеет для меня смысла. Это сокращенная версия моего скрипта:
options = PipelineOptions()
p = TestPipeline(options = options)
bill_info = (p
| 'Create Pcollection' >> beam.Create('a')
| 'Reformatting' >> beam.ParDo(ReformatAttributesFn())
| 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
| 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn())
| 'Fix Nickname' >> beam.Map(add_nickname, n_tbl=beam.pvalue.AsList(nickname_tbl_ex), return_n=False)
| 'Check Sponsor' >> beam.Map(check_pol, p_tbl=beam.pvalue.AsList(pol_tbl_ex))
| 'Check Bill' >> beam.Map(check_bill, b_tbl=beam.pvalue.AsList(bill_tbl_ex))
| 'Final Formatting' >> beam.ParDo(FinalFormatFn())
| 'Write To Text' >> beam.io.WriteToText('C:/Users/cmatt/Downloads/test_bill_votes',
file_name_suffix='.csv'))
p.run()
Я получаю сообщение об ошибке «TypeError: object () не принимает параметров» в функции p.run (). Но даже с самым простым конвейером, таким как этот:
options = PipelineOptions()
p = TestPipeline(options = options)
bill_info = (p
| 'Create Pcollection' >> beam.Create('a'))
p.run()
Я по-прежнему получаю ту же ошибку. Теперь объект конвейера не должен принимать никаких входных данных; это просто способ запустить все PCollections через конвейер. Но какая часть моего скрипта вызывает ошибку? Может быть, некоторые модули в моей виртуальной среде неправильно выстраиваются в линию? Или есть отдельная проблема, которую я не вижу?
Спасибо всем за помощь!
Недостаточно контекста для выполнения вашего скрипта python. Тот факт, что вы также получаете ту же ошибку с базовой инструкцией Dataflow, заставляет меня думать, что в вашем скрипте есть ошибка, связанная с самим python, например «смешанные табуляции и пробелы», у вас есть что-то подобное?
Я просто провожу быстрый тест из командной строки python, и он сработал (шаги, взятые из Быстрый старт с использованием Python):
>>> import apache_beam as beam
>>> from apache_beam.pipeline import PipelineOptions
>>> options = PipelineOptions()
>>> p = beam.Pipeline(options=options)
>>> bill_info = (p | 'Create Pcollection' >> beam.Create(['a','b']))
>>> p.run()
<apache_beam.runners.portability.fn_api_runner.RunnerResult object at 0x7fb57856d290>
>>>
Если приведенная выше информация не помогает, не могли бы вы поделиться всем базовым скриптом, который вы выполняете, и тем, как вы настраиваете свою виртуальную среду? Поможет воспроизвести.
Мне удалось решить проблему. Это определенно имело какое-то отношение к тому, как загружались пакеты, и был некоторый конфликт в номенклатуре. Я открыл чистую виртуальную среду в PyCharm, и управление пакетами стало намного чище. Тогда все работало. Цените помощь!