Composer выдает ошибку при установке setup.py с BeamRunPythonPipelineOperator

У меня возникают проблемы при передаче аргумента «setup_file» моему BeamRunPythonPipelineOperator. Вот трассировка в журналах Composer.

[2022-11-16, 05:03:19 UTC] {beam.py:127} WARNING - error: [Errno 2] No such file or directory: 'csv_converter-0.0.1/csv_converter.egg-info/PKG-INFO'
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - Traceback (most recent call last):
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/site-packages/apache_beam/utils/processes.py", line 89, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     out = subprocess.check_output(*args, **kwargs)
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/subprocess.py", line 415, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/subprocess.py", line 516, in run
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     raise CalledProcessError(retcode, process.args,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - subprocess.CalledProcessError: Command '['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpifl6ty8k']' returned non-zero exit status 1.

Я понятия не имею, почему это [Errno 2] Нет такого файла или каталога. Некоторые DAG работают нормально, а некоторые сообщают об этой ошибке. Иногда я получаю различные ошибки, такие как другой файл из setup.py не может быть найден или [Errno 5] Ошибка ввода/вывода

Это мой оператор:

BeamRunPythonPipelineOperator(
    task_id='xxxx',
    runner = "DataflowRunner",
    py_file=f'/home/airflow/gcs/data/csv_converter/main.py',
    pipeline_options = {
        'project_id': project_id,
        'input_path': input_path,
        'output_path': output_path,
        'schema_path': schema_path,
        'service_account': service_account,     
        'no_use_public_ips': True,
        'subnetwork': subnetwork,      
        'staging_location': staging_location,
        'temp_location': temp_location,
        "setup_file": f'/home/airflow/gcs/data/csv_converter/setup.py',
        "machine_type": "n1-standard-4",
        "num_workers": 5,
        "max_num_workers": 10,
    },
    py_options=[],
    py_interpreter='python3',
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name='{{task.task_id}}',
        location=gce_region,
        wait_until_finished=False,
        gcp_conn_id = "dataflow_conn"
    ),
)

Эта ошибка очень расстраивает, так как я понятия не имею, как ее исправить, и не нашел никого, кто сталкивался с такой же проблемой.

Некоторый контекст: наш процесс состоит из запуска DAG, когда файлы .CSV попадают в корзину. Сначала я подумал, что проблема в планировщиках и параллелизме, так как у нас были задачи-зомби. Я заметил, что с 2 планировщиками с 2 виртуальными ЦП мы замечаем загрузку ЦП около ~80% (всегда застревает на >3/4 виртуальных ЦП, даже несмотря на то, что группы обеспечения доступности баз данных запускаются в пакетном режиме при приземлении нескольких .CSV). Я попытался увеличить планировщики до 4 и 4 виртуальных ЦП, но проблема не устранена. Я ожидаю, что процесс установит мой пакет правильно.

  • Версия композитора: 2.0.31
  • Версия воздушного потока: 2.3.3
  • версия apache-airflow-providers-google: 8.1.0
  • версия апач-луча: 2.41.0
Создание приборной панели для анализа данных на GCP - часть I
Создание приборной панели для анализа данных на GCP - часть I
Недавно я столкнулся с интересной бизнес-задачей - визуализацией сбоев в цепочке поставок лекарств, которую могут просматривать врачи и...
0
0
76
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

У меня была эта проблема ранее, я думаю, что если вы используете setup.py в корне из папки ComposerDAG, это решит вашу проблему.

Я также рекомендую вам развернуть папку с заданиями Beam в dags вместо data.

data больше используется для Airflow переменных.

Пример :

# Get DAG folder via an env var
dag_folder = os.getenv("DAGS_FOLDER")

BeamRunPythonPipelineOperator(
    task_id='xxxx',
    runner = "DataflowRunner",
    py_file=f'/home/airflow/gcs/dags/csv_converter/main.py',
    pipeline_options = {
        'project_id': project_id,
        'input_path': input_path,
        'output_path': output_path,
        'schema_path': schema_path,
        'service_account': service_account,     
        'no_use_public_ips': True,
        'subnetwork': subnetwork,      
        'staging_location': staging_location,
        'temp_location': temp_location,
        'setup_file': f"{dag_folder}/setup.py",
        "machine_type": "n1-standard-4",
        "num_workers": 5,
        "max_num_workers": 10,
    },
    py_options=[],
    py_interpreter='python3',
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name='{{task.task_id}}',
        location=gce_region,
        wait_until_finished=False,
        gcp_conn_id = "dataflow_conn"
    ),
)

Некоторые пояснения:

  • setup.py используется в корне папки DAG: {composer_bucket}/dags/setup.py
  • dag_folder извлекается из Composer env var
  • Настройка задается как опция в операторе Beam следующим образом: 'setup_file': f"{dag_folder}/setup.py"

Не могли бы вы переместить main.py в {composer_bucket}/dags вместо {composer_bucket}/data, пожалуйста?

Mazlum Tosun 16.11.2022 19:14

Здравствуйте, Мазлум. Спасибо, что так быстро ответили. Я оставил свой main.py в {composer_bucket}/data/main.py и переместил свой setup.py в папку dags. Я получаю ModuleNotFound. в main.py я пытаюсь относительно импортировать файл, содержащий код конвейера. from pipeline import csv_to_avro Вот где происходит сбой DAG. Раньше работало. Редактировать: я только что увидел ваш комментарий. Перемещение моего кода потока данных в dags/

nano 16.11.2022 19:24

Пожалуйста, я отредактировал свой ответ, чтобы порекомендовать вам переместить папку main.py и job в папку dags вместо data.

Mazlum Tosun 16.11.2022 19:31

Перемещение моего кода потока данных в папку dags действительно сработало! Я также больше не получаю эти странные ошибки (ошибка: [Errno 2] Нет такого файла или каталога: 'csv_converter-0.0.1/csv_converter.egg-info/PKG-INFO'). Не уверен, как вы это поняли или что вызывает это, но спасибо!!

nano 16.11.2022 21:31

Всегда пожалуйста :), можете ли вы проголосовать за мой ответ, чтобы сделать его более заметным и помочь другим?

Mazlum Tosun 16.11.2022 21:37

Я бы с радостью это сделал, но у меня нет 15 репутации

nano 16.11.2022 21:54

Другие вопросы по теме