У меня есть DAG с 5 параллельными задачами. Я не хочу выполнять их все за один раз. Я хочу передать некоторый флаг/значение задаче, предположим, что если для флага установлено значение True, оно запускается, а если для флага установлено значение False, оно пропускается. Можем ли мы сделать это в Airflow2. Аргументы по умолчанию очень простые, и я не запланировал DAG.
Мой DAG Flow выглядит примерно так
starttask>>5paralleltask>>endtask
И эти 5 параллельных задач я создал с помощью цикла for
Огромное спасибо заранее
Я не уверен, что понимаю требование.
Если вы хотите выполнить все задачи, но ограничить параллелизм, используйте max_active_tasks
: количество экземпляров задачи, разрешенных для одновременного запуска.
Итак, в вашем случае вам нужно установить:
from airflow import DAG
with DAG(
dag_id='somedag',
...,
max_active_tasks=1,
) as dag:
...
Если вы хотите добавить логику пропуска, то в некоторых случаях задача будет выполняться, а в других нет, вам нужно добавить такие операторы, как: ShortCircuitOperator
, BranchPythonOperator
и т. д., которые будут решать, когда задачу следует пропустить.
5 параллельных задач, которые у меня есть, - это операторы bash.
Чтобы пропустить несколько задач и запустить несколько задач, я заставил выбранную задачу завершиться неудачей и показать, что она пропущена, я использовал exit 99
с моей командой bash, например:
'echo "running";exit 99'
Есть два шага:
Чтобы принудительно выполнить задачу, я добавил список из 5 флагов (True/False).
если флаг имеет значение True, задача выполняется, если значение false, она принудительно завершается неудачно, а затем пропускается. за пропуск задачи я использовал exit 99
.
И это работает, как я ожидал в моем случае.
Спасибо, Элад, за вашу помощь. Я больше интересовался запуском выбранной задачи за раз и пропуском нескольких задач одновременно. Прошу прощения, если вопрос не очень ясен. Я понял, как это сделать. Добавление ответа, чтобы другие могли воспользоваться