Как я могу запустить конвейер потока данных с установочным файлом с помощью cloud composer / apache airflow?

У меня есть рабочий конвейер потока данных, в котором сначала запускается setup.py для установки некоторых локальных вспомогательных модулей. Теперь я хочу использовать Cloud Composer / Apache Airflow для планирования конвейера. Я создал свой файл DAG и поместил его в назначенную папку Google Storage DAG вместе с моим конвейерным проектом. Структура папок выглядит так:

{Composer-Bucket}/
    dags/
       --DAG.py
       Pipeline-Project/
           --Pipeline.py
           --setup.py
           Module1/
              --__init__.py
           Module2/
              --__init__.py
           Module3/
              --__init__.py

Часть моей DAG, которая определяет файл setup.py, выглядит так:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
    task_id="resumeparsertask",
    py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
    dataflow_default_options={
        "project": {PROJECT-NAME},    
        "setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

Однако когда я смотрю журналы в веб-интерфейсе Airflow, я получаю сообщение об ошибке:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

Я не уверен, почему не удается найти установочный файл. Как я могу запустить конвейер потока данных с установочным файлом / модулями?

2
0
1 563
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы запускаете Composer и Dataflow с одной и той же учетной записью службы или они разные? В последнем случае вы проверили, имеет ли учетная запись службы Dataflow доступ для чтения к корзине и объекту?

У них есть собственные учетные записи служб по умолчанию. Учетная запись службы Dataflow по умолчанию имеет доступ для чтения и записи к корзине и папке Composer dag, поскольку обе они работают в одном проекте.

Melissa Guo 14.09.2018 16:16
Ответ принят как подходящий

Если вы посмотрите на код для Поток данных, похоже, что основной py_file может быть файлом внутри корзины GCS и локализуется оператором до выполнения конвейера. Однако я не вижу ничего подобного для dataflow_default_options. Похоже, что параметры просто копируются и форматируются.

Поскольку dag-папка GCS монтируется на экземплярах Airflow с помощью Предохранитель облачного хранилища, у вас должна быть возможность получить доступ к файлу локально с помощью env var "dags_folder". то есть вы могли бы сделать что-то вроде этого:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

Затем вы можете использовать переменную LOCAL_SETUP_FILE для свойства setup_file в dataflow_default_options.

Другие вопросы по теме