У меня есть группа обеспечения доступности баз данных воздушного потока с зависимостями: start_task >> group1, start_task >> group2.
Задачи лежат в словаре «задачи» и используются для заполнения групп задач в DAG.
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from python.etl.dag_param_generator import DAGParamGenerator
from airflow.utils.task_group import TaskGroup
dag_param_generator = DAGParamGenerator('TEST_DINAMIC_DAG')
tasks = {
"task_1": EmptyOperator(task_id = "task_1"),
"task_2": EmptyOperator(task_id = "task_2"),
"task_3": EmptyOperator(task_id = "task_3"),
"task_4": EmptyOperator(task_id = "task_4")
}
with DAG(
dag_id=dag_param_generator.get_dag_params('dag_id'),
schedule=dag_param_generator.get_dag_params('schedule'),
start_date=dag_param_generator.get_dag_params('start_date'),
catchup=dag_param_generator.get_dag_params('catchup')
) as dag:
start_task = EmptyOperator(task_id = "start_task")
with TaskGroup("group1") as group1:
for key, value in tasks.items():
if key in ['task_1', 'task_2']:
print(value)
globals()[key] = value
with TaskGroup("group2") as group2:
for key, value in tasks.items():
if key in ['task_3', 'task_4']:
print(value)
globals()[key] = value
start_task >> group1
start_task >> group2
Задачи не помещаются в группы, и я получаю следующий результат (группы пусты):
Как я могу решить эту проблему?






Я понятия не имею, что вы подразумеваете под globals() и почему вы выполняете двойной цикл. Вместо этого добавьте задачу в группу.
with TaskGroup("group1") as group1:
for key in ['task_1', 'task_2']:
group1.add(tasks[key])
with TaskGroup("group2") as group2:
for key in ['task_3', 'task_4']:
group2.add(tasks[key])
Что касается globals(): создание переменной с именем task_1 в global не создает задачу. Запуск конструктора задачи, т.е. EmptyOperator() создаёт задачу. У вас может быть массив лямбда-выражений, если вы хотите позднего выполнения. Как в:
tasks = {
"task_1": lambda: EmptyOperator(task_id = "task_1"),
"task_2": lambda: EmptyOperator(task_id = "task_2"),
"task_3": lambda: EmptyOperator(task_id = "task_3"),
"task_4": lambda: EmptyOperator(task_id = "task_4")
}
with TaskGroup("group1") as group1:
for key in ['task_1', 'task_2']:
tasks[key]()
with TaskGroup("group2") as group2:
for key in ['task_3', 'task_4']:
tasks[key]()
Когда я попытался запустить код с вашим исправлением, я получил ошибку: AttributeError: объект «TaskGroup» не имеет атрибута «append».