Группировка задач в Airflow

У меня есть группа обеспечения доступности баз данных воздушного потока с зависимостями: start_task >> group1, start_task >> group2.

Задачи лежат в словаре «задачи» и используются для заполнения групп задач в DAG.

from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from python.etl.dag_param_generator import DAGParamGenerator
from airflow.utils.task_group import TaskGroup

dag_param_generator = DAGParamGenerator('TEST_DINAMIC_DAG')
tasks = {
    "task_1": EmptyOperator(task_id = "task_1"),
    "task_2": EmptyOperator(task_id = "task_2"),
    "task_3": EmptyOperator(task_id = "task_3"),
    "task_4": EmptyOperator(task_id = "task_4")
}

with DAG(
    dag_id=dag_param_generator.get_dag_params('dag_id'),
    schedule=dag_param_generator.get_dag_params('schedule'),
    start_date=dag_param_generator.get_dag_params('start_date'),
    catchup=dag_param_generator.get_dag_params('catchup')
) as dag:
    start_task = EmptyOperator(task_id = "start_task")

    with TaskGroup("group1") as group1:
        for key, value in tasks.items():
            if key in ['task_1', 'task_2']:
                print(value)
                globals()[key] = value
    with TaskGroup("group2") as group2:
        for key, value in tasks.items():
            if key in ['task_3', 'task_4']:
                print(value)
                globals()[key] = value
                
    start_task >> group1
    start_task >> group2

Задачи не помещаются в группы, и я получаю следующий результат (группы пусты):

Как я могу решить эту проблему?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я понятия не имею, что вы подразумеваете под globals() и почему вы выполняете двойной цикл. Вместо этого добавьте задачу в группу.

with TaskGroup("group1") as group1:
    for key in ['task_1', 'task_2']:
        group1.add(tasks[key])
with TaskGroup("group2") as group2:
    for key in ['task_3', 'task_4']:
        group2.add(tasks[key])

Что касается globals(): создание переменной с именем task_1 в global не создает задачу. Запуск конструктора задачи, т.е. EmptyOperator() создаёт задачу. У вас может быть массив лямбда-выражений, если вы хотите позднего выполнения. Как в:

tasks = {
    "task_1": lambda:  EmptyOperator(task_id = "task_1"),
    "task_2": lambda: EmptyOperator(task_id = "task_2"),
    "task_3": lambda: EmptyOperator(task_id = "task_3"),
    "task_4": lambda: EmptyOperator(task_id = "task_4")
}

   with TaskGroup("group1") as group1:
        for key in ['task_1', 'task_2']:
            tasks[key]()
    with TaskGroup("group2") as group2:
        for key in ['task_3', 'task_4']:
            tasks[key]()

Когда я попытался запустить код с вашим исправлением, я получил ошибку: AttributeError: объект «TaskGroup» не имеет атрибута «append».

Oleksandr Zakharchenko 29.03.2024 14:27
github.com/apache/airflow/blob/main/airflow/utils/… что происходит при записи из памяти
KamilCuk 29.03.2024 14:28

Другие вопросы по теме