Чтение переменных XCOM и Airflow, вероятно, замедляет Airflow (в Google Cloud Composer)

Мы пытаемся объединить файлы ежедневного извлечения (CSV) в наше хранилище данных.

В нашем случае код Python для DAG одинаков для всех наших DAG (~ 2000), поэтому мы генерируем их с помощью Логика генератора DAG из одного файла Python. В наших DAG у нас всего 15 задач (5 фиктивных задач, 2 задачи CloudDataFusionStartPipelineOperator, 8 задач Python).

В процессе создания DAG мы читаем переменные воздушного потока (~30-50), чтобы определить, какие DAG следует генерировать (это также определяет идентификаторы DAG и имена схем/таблиц, которые они должны обрабатывать). Мы называем эти переменные генератора.

В процессе генерации DAG группы DAG также считывают свою конфигурацию по своим идентификаторам (еще 2–3 переменных воздушного потока на сгенерированную DAG). Мы называем эти переменные конфигуратора.

К сожалению, в наших DAG нам приходится обрабатывать некоторые переданные аргументы (через REST API) и множество динамически вычисляемая информация между задачами, поэтому мы полагаемся на функциональность XCOM Airflow. Это означает огромное количество операций чтения в базе данных Airflow.

Там, где это возможно, мы используем пользовательские макросы для настройки задач для задержки выполнения чтения базы данных (выполнения XCOM pull) до тех пор, пока задача не будет выполнена, но это по-прежнему создает большую нагрузку на Airflow (Google Cloud Composer). Около 50 пулов из XCOM.

Вопросы:

  • База данных Airflow рассчитана на такое большое количество чтений (переменных Airflow и в основном значений из XCOM)?
  • Как мы должны изменить наш код, если между задачами нужно передавать большое количество динамически вычисляемых полей и метаданных?
  • Должны ли мы просто принять тот факт, что в этом случае использования БД большая нагрузка, и просто масштабировать БД по вертикали?

Пример вытягивания XCOM:

Metadata = PythonOperator(
    task_id         = TASK_NAME_PREFIX__METADATA + str(dag_id),
    python_callable = metadataManagment,
    op_kwargs       = {
        'dag_id'           : dag_id,
        'execution_case'   : '{{ ti.xcom_pull(task_ids = "' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key = "execution_case_for_metadata") }}',
        'date'             : '{{ ti.xcom_pull(task_ids = "' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key = "folder_date") }}',
        'enc_path'         : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids = "' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key = "folder_date")) }}',
        'dec_path'         : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids = "' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key = "folder_date")) }}',
        'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
    },
    provide_context = True,
    trigger_rule    = TriggerRule.ALL_DONE
)

Пример переменных воздушного потока генератора:

key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]

key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]

key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]

key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]

key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]

Пример генератора DAG:

# DAG_TYPE = STD
env_vars                                = Variable.get('environment_variables')

airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types                             = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']

list_of_schemas_with_group              = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names                     = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables                          = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables                          = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]


for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
    for table_name in table_names_with_schema_prefix:

        dag_id = str(table_name)
        globals()[dag_id] = create_dag( dag_id,
                                        schedule,
                                        default_dag_args,
                                        schema_name,
                                        table_type,
                                        env_vars,
                                        tags )
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
21
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?

Да, но код, которым вы поделились, является оскорбительным. Вы используете Variable.get() в коде верхнего уровня. Это означает, что каждый раз, когда файл .py анализируется, Airflow выполняет Variable.get(), который открывает сеанс для БД. Предполагая, что вы не изменили значения по умолчанию (мин_файл_процесс_интервал), это означает, что каждые 30 секунд вы выполняете Variable.get() для каждой DAG.

Чтобы представить это в цифрах, вы упомянули, что у вас есть 2000 DAG, каждая из которых делает ~ 30-50 вызовов Variable.get(), это означает, что у вас есть диапазон от 6000 до 10000 вызовов в базу данных каждые 30 секунд. Это очень оскорбительно.

Если вы хотите использовать переменные в коде верхнего уровня, вы должны использовать переменные среды, а не переменные Airflow. Это объясняется в документе Динамические DAG с переменными среды.

Отмечая, что Airflow предлагает возможность определения пользовательского Секретный сервер.

How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?

Воздушный поток может обрабатывать большие объемы. Проблема больше связана с тем, как вы написали DAG. Если есть опасения по поводу таблицы Xcom или вы предпочитаете хранить ее где-то еще, Airflow поддерживает собственный бэкенд Xcom.

Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?

Судя по вашему описанию, вы можете что-то сделать, чтобы улучшить ситуацию. Воздушный поток тестируется на больших объемах дагов и задач (вертикальный масштаб и горизонтальный масштаб). Если вы обнаружили признаки проблем с производительностью, вы можете сообщить об этом, открыв Проблема с гитхабом для проекта. я

Большое спасибо за ваш комментарий! Я хотел бы немного исправить ваш ответ, потому что по моим расчетам цифры немного меньше. Чтение 30-50 переменных генерирует 2000 DAG, и на этапе генерации они считывают 2 другие переменные. В сумме получается ~4000 чтений, а не 6-10к. Хотя я тестировал это в среде, где было развернуто всего 400 DAG (с ~ 800 переменными конфигурации) и min_file_process_interval было установлено от 30 до 600, производительность по-прежнему была плохой. Из вашего ответа я предполагаю, что эти числа все еще злоупотребляют БД, верно?

elaspog 10.04.2022 09:07

И я проверю переменные среды для конфигурации DAG, спасибо за идею!

elaspog 10.04.2022 09:09

Ах, хорошо, это дополнительная информация, которой вы не поделились в начале, но тем не менее. В коде верхнего уровня не должно быть использования Variable.get(). Это действительно плохая практика. Как объяснено... если после удаления вызовов Variable.get() возникнет проблема с производительностью, пожалуйста, откройте проблему на github. Кстати, рассмотрите возможность чтения конфигурации из файла конфигурации, хранящегося на вашем диске. Я храню свои конфиги вместе с кодом DAG. Это также дает ценность использования git для их обновления, поэтому я знаю о любых изменениях, внесенных в конфигурации.

Elad Kalif 10.04.2022 09:12

Другие вопросы по теме