Чтение переменных XCOM и Airflow, вероятно, замедляет Airflow (в Google Cloud Composer)

Мы пытаемся объединить файлы ежедневного извлечения (CSV) в наше хранилище данных.

В нашем случае код Python для DAG одинаков для всех наших DAG (~ 2000), поэтому мы генерируем их с помощью Логика генератора DAG из одного файла Python. В наших DAG у нас всего 15 задач (5 фиктивных задач, 2 задачи CloudDataFusionStartPipelineOperator, 8 задач Python).

В процессе создания DAG мы читаем переменные воздушного потока (~30-50), чтобы определить, какие DAG следует генерировать (это также определяет идентификаторы DAG и имена схем/таблиц, которые они должны обрабатывать). Мы называем эти переменные генератора.

В процессе генерации DAG группы DAG также считывают свою конфигурацию по своим идентификаторам (еще 2–3 переменных воздушного потока на сгенерированную DAG). Мы называем эти переменные конфигуратора.

К сожалению, в наших DAG нам приходится обрабатывать некоторые переданные аргументы (через REST API) и множество динамически вычисляемая информация между задачами, поэтому мы полагаемся на функциональность XCOM Airflow. Это означает огромное количество операций чтения в базе данных Airflow.

Там, где это возможно, мы используем пользовательские макросы для настройки задач для задержки выполнения чтения базы данных (выполнения XCOM pull) до тех пор, пока задача не будет выполнена, но это по-прежнему создает большую нагрузку на Airflow (Google Cloud Composer). Около 50 пулов из XCOM.

Вопросы:

  • База данных Airflow рассчитана на такое большое количество чтений (переменных Airflow и в основном значений из XCOM)?
  • Как мы должны изменить наш код, если между задачами нужно передавать большое количество динамически вычисляемых полей и метаданных?
  • Должны ли мы просто принять тот факт, что в этом случае использования БД большая нагрузка, и просто масштабировать БД по вертикали?

Пример вытягивания XCOM:

Metadata = PythonOperator(
    task_id         = TASK_NAME_PREFIX__METADATA + str(dag_id),
    python_callable = metadataManagment,
    op_kwargs       = {
        'dag_id'           : dag_id,
        'execution_case'   : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}',
        'date'             : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}',
        'enc_path'         : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
        'dec_path'         : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
        'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
    },
    provide_context = True,
    trigger_rule    = TriggerRule.ALL_DONE
)

Пример переменных воздушного потока генератора:

key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]

key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]

key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]

key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]

key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]

Пример генератора DAG:

# DAG_TYPE = STD
env_vars                                = Variable.get('environment_variables')

airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types                             = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']

list_of_schemas_with_group              = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names                     = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables                          = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables                          = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]


for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
    for table_name in table_names_with_schema_prefix:

        dag_id = str(table_name)
        globals()[dag_id] = create_dag( dag_id,
                                        schedule,
                                        default_dag_args,
                                        schema_name,
                                        table_type,
                                        env_vars,
                                        tags )
Формы c голосовым вводом в React с помощью Speechly
Формы c голосовым вводом в React с помощью Speechly
Пытались ли вы когда-нибудь заполнить веб-форму в области электронной коммерции, которая требует много кликов и выбора? Вас попросят заполнить дату,...
Стилизация и валидация html-формы без использования JavaScript (только HTML/CSS)
Стилизация и валидация html-формы без использования JavaScript (только HTML/CSS)
Будучи разработчиком веб-приложений, легко впасть в заблуждение, считая, что приложение без JavaScript не имеет права на жизнь. Нам становится удобно...
Flatpickr: простой модуль календаря для вашего приложения на React
Flatpickr: простой модуль календаря для вашего приложения на React
Если вы ищете пакет для быстрой интеграции календаря с выбором даты в ваше приложения, то библиотека Flatpickr отлично справится с этой задачей....
В чем разница между Promise и Observable?
В чем разница между Promise и Observable?
Разберитесь в этом вопросе, и вы значительно повысите уровень своей компетенции.
Что такое cURL в PHP? Встроенные функции и пример GET запроса
Что такое cURL в PHP? Встроенные функции и пример GET запроса
Клиент для URL-адресов, cURL, позволяет взаимодействовать с множеством различных серверов по множеству различных протоколов с синтаксисом URL.
Четыре эффективных способа центрирования блочных элементов в CSS
Четыре эффективных способа центрирования блочных элементов в CSS
У каждого из нас бывали случаи, когда нам нужно отцентрировать блочный элемент, но мы не знаем, как это сделать. Даже если мы реализуем какой-то...
1
0
21
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?

Да, но код, которым вы поделились, является оскорбительным. Вы используете Variable.get() в коде верхнего уровня. Это означает, что каждый раз, когда файл .py анализируется, Airflow выполняет Variable.get(), который открывает сеанс для БД. Предполагая, что вы не изменили значения по умолчанию (мин_файл_процесс_интервал), это означает, что каждые 30 секунд вы выполняете Variable.get() для каждой DAG.

Чтобы представить это в цифрах, вы упомянули, что у вас есть 2000 DAG, каждая из которых делает ~ 30-50 вызовов Variable.get(), это означает, что у вас есть диапазон от 6000 до 10000 вызовов в базу данных каждые 30 секунд. Это очень оскорбительно.

Если вы хотите использовать переменные в коде верхнего уровня, вы должны использовать переменные среды, а не переменные Airflow. Это объясняется в документе Динамические DAG с переменными среды.

Отмечая, что Airflow предлагает возможность определения пользовательского Секретный сервер.

How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?

Воздушный поток может обрабатывать большие объемы. Проблема больше связана с тем, как вы написали DAG. Если есть опасения по поводу таблицы Xcom или вы предпочитаете хранить ее где-то еще, Airflow поддерживает собственный бэкенд Xcom.

Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?

Судя по вашему описанию, вы можете что-то сделать, чтобы улучшить ситуацию. Воздушный поток тестируется на больших объемах дагов и задач (вертикальный масштаб и горизонтальный масштаб). Если вы обнаружили признаки проблем с производительностью, вы можете сообщить об этом, открыв Проблема с гитхабом для проекта. я

Большое спасибо за ваш комментарий! Я хотел бы немного исправить ваш ответ, потому что по моим расчетам цифры немного меньше. Чтение 30-50 переменных генерирует 2000 DAG, и на этапе генерации они считывают 2 другие переменные. В сумме получается ~4000 чтений, а не 6-10к. Хотя я тестировал это в среде, где было развернуто всего 400 DAG (с ~ 800 переменными конфигурации) и min_file_process_interval было установлено от 30 до 600, производительность по-прежнему была плохой. Из вашего ответа я предполагаю, что эти числа все еще злоупотребляют БД, верно?

elaspog 10.04.2022 09:07

И я проверю переменные среды для конфигурации DAG, спасибо за идею!

elaspog 10.04.2022 09:09

Ах, хорошо, это дополнительная информация, которой вы не поделились в начале, но тем не менее. В коде верхнего уровня не должно быть использования Variable.get(). Это действительно плохая практика. Как объяснено... если после удаления вызовов Variable.get() возникнет проблема с производительностью, пожалуйста, откройте проблему на github. Кстати, рассмотрите возможность чтения конфигурации из файла конфигурации, хранящегося на вашем диске. Я храню свои конфиги вместе с кодом DAG. Это также дает ценность использования git для их обновления, поэтому я знаю о любых изменениях, внесенных в конфигурации.

Elad Kalif 10.04.2022 09:12

Другие вопросы по теме