У нас есть проблемы. Когда мы перезапускаем airflow - все наши задачи запускаются, в это время большая нагрузка на наши сервера. Как предотвратить запуск всех задач при перезапуске воздушного потока?
Пример задачи:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 2),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False
}
dag = DAG(
'start_data_collect', default_args=default_args, schedule_interval=timedelta(minutes=10))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
Мы используем сельдерей с настройками по умолчанию.
Тогда вы можете изменить настройку celeryd_concurrency
.
Если ваши серверы сильно загружены, это связано с тем, что вашим сотрудникам легко подключиться и запросить / использовать их. Я рекомендую, чтобы в следующий раз, прежде чем перезапустить Airflow, вы просмотрели все даги, использующие подключения к серверам, которые могут быть перегружены, и настроить их задачи так, чтобы каждый из них использовал пул, возможно, по одному на сервер или соединение. Вам также необходимо создать пул в пользовательском интерфейсе со слотами. Но в v1.9 и более поздних версиях есть патч, который автоматически создает пулы с 0 слотами, поэтому вы можете использовать пользовательский интерфейс, чтобы войти и добавить несколько слотов к каждому из них. Таким образом, даже если планировщик может запланировать много доступных задач, он не будет планировать больше, чем количество задач может поместиться в слоты.
Если вы переключите статус на «ВЫКЛ» в пользовательском интерфейсе, это должно остановить вашу DAG. Если вы хотите, чтобы новые группы DAG были отключены по умолчанию, вы можете обновить набор dags_are_paused_at_creation = True в разделе [core] в вашей конфигурации воздушного потока.
Вы что-нибудь меняли в airflow.cfg для
max_threads
? Или вы используете сельдерей или что-то подобное?