Как регистрировать все результаты тестов DBT в централизованной таблице snowflake

RedDeveloper
12.04.2023 11:51
Как регистрировать все результаты тестов DBT в централизованной таблице snowflake

Пояснение

DBT имеет множество встроенных функций для автоматизации трудоемкой работы. Одной из таких функций является тест DBT.

DBT test будет генерировать единичные тесты на основе файла source.yaml. Предположим, мы хотим добавить проверку NOT NULL к столбцу в исходной таблице, мы просто добавим следующее:

version: 2
sources:  
  - name: source_name
      database: db_name
      schema: schema_name
      tables:
        - name: table_name
          columns:
            - name: column_name
              tests:
                - not_null

DBT автоматически сгенерирует тест, который будет выглядеть как...

select column_name
from db_name.schema_name.table_name
where column_nameis null

Результат этого запроса отображается в консоли, на которой вызывается тест DBT. Чтобы сохранить журналы в базе данных, мы можем добавить флаг - store-failures к команде DBT test, и DBT создаст таблицу в базе данных, к которой мы сможем обратиться позже.

Проблема этого подхода заключается в том, что количество таблиц прямо пропорционально количеству моделей с тестами.

Чтобы преодолеть эту проблему, мы использовали Python. Используя Python, можно получить сингулярную таблицу со следующими колонками: имя модели, имя теста, статус теста, количество отказов и так далее.

Когда DBT выполняет команду DBT test, генерируется файл run_results.json.

{
    "status": "error",
    "timing": [],
    "thread_id": "Thread-3 (worker)",
    "execution_time": 3.470108985900879,
    "adapter_response": {},
    "message": "",
    "failures": null,
    "unique_id": "test.project.test_name_model_name__column_names"
}

Реализация

Файл run_results.json содержит всю необходимую информацию для анализа с помощью Python и отправки в базу данных, например Snowflake, с помощью пакета snowflake-connector-python.

Зависимости для этого сценария Python следующие:

  1. snowflake-connector-python==2.8.2
  2. PyYAML==6.0.0

Сначала загрузите файл run_results.json и файл source.yaml.

  from json import loads
  import snowflake.connector as sf
  from yaml.loader import SafeLoader
with open(rf'{target_path}/run_results.json', 'r') as input_file:
    my_json = input_file.read()

new_json = loads(my_json)

my_tables = []
with open(rf'{dbt_path}/models/source/source.yml') as f:
    my_source = load(f, Loader=SafeLoader)
for i in (range(len(my_source['sources']))):
    for j in (range(len(my_source['sources'][i]['tables']))):
        schema_name = my_source['sources'][i]['name']
        table_name = my_source['sources'][i]['tables'][j]['name']
        result = table_name
        my_tables.append(result)

А затем добавить имена тестов. (Можно получить эту информацию из файла source.yaml, но это добавит ненужную сложность в данное конкретное решение).

my_test_names = ['source_not_null','source_table_not_empty', 'not_null', 'relationships']

Логика парсинга:

for i in range(len(new_json['results'])):
    unique_id = new_json['results'][i]['unique_id']
    test_name_model_name_column_name = ''
    test_name = ''
    ref_test_name = ''
    model_name = ''
    column_name = ''
    severity = 'error'
    if unique_id.split('.')[0] == 'test':
        test_name_model_name_column_name = unique_id.split('.')[2]
        for test in my_test_names:
            if test in test_name_model_name_column_name:
                test_name = test
                if test_name == 'relationships':
                    test_name_model_name_column_name = test_name_model_name_column_name.replace('relationships', '')
                    ref_test_name = test_name_model_name_column_name.lstrip('_').replace('__', ',').split(',')[-1]
                    test_name_model_name_column_name = test_name_model_name_column_name.replace(ref_test_name, '')
            test_name_model_name_column_name = test_name_model_name_column_name.replace(
                test_name, '')

        for model in my_tables:
            if model in test_name_model_name_column_name:
                model_name = model
                if '__warn' in test_name_model_name_column_name:
                    severity = 'warn'
                    test_name_model_name_column_name = test_name_model_name_column_name.replace(
                        '__warn', '')
                else:
                    test_name_model_name_column_name = test_name_model_name_column_name.replace(
                        '__error', '')

                if test_name != 'relationships':
                    column_name = test_name_model_name_column_name.replace(model, '').replace('_amis_', '').lstrip(
                        '_').replace('__', ',').replace('sk', model+'_sk').replace('key', model+'_key')  # has to be in this order
                else:
                    column_name = test_name_model_name_column_name.replace(model, '').lstrip(
                        '_').replace('__', ',')

        # print(model_name, '-->', column_name)
        # print(severity)
        execution_time = new_json['results'][i]['execution_time']
        status = new_json['results'][i]['status']
        failures = new_json['results'][i]['failures']
        message = new_json['results'][i]['message']

        values += f"""
        (
            '{'Relationship Integrity test' if ref_test_name != '' else test_name}',
            '{model_name}',
            '{column_name if column_name != '' else 'NULL'}',
            '{status}',
            '{0 if failures == None else failures}',
            '{'NULL' if message == None else message.replace("'", '`')}',
            '{execution_time}', 
            current_timestamp::timestamp_ntz
        ),"""

И наконец, публикуем результаты в таблицу снежинок.

    def insert_values_into_log():
        conn = sf.connect(user=user, password=password, account=account,
                        warehouse=warehouse, database=database, authenticator=authenticator)

        def run_query(conn, query):
            cursor = conn.cursor()
            cursor.execute(query)
            cursor.close()
        insert_values = f"""insert into DATABASE_NAME.SCHEMA_NAME.TABLE_NAME(
                                test_name, 
                                model_name,
                                column_names, 
                                test_status, 
                                failures, 
                                message, 
                                test_execution_time, 
                                effective_timestamp
                            )
                            values {values[:-1]}"""
        run_query(conn, insert_values)
        print('Success')

    insert_values_into_log()

Примечание: Не забудьте добавить собственные учетные данные в sf.connect. Также измените в команде insert into параметры DATABASE_NAME.SCHEMA_NAME.TABLE_NAME на те, которые подходят для вашей таблицы.

Полученная таблица будет содержать все тесты с их именами моделей, именами столбцов, статусом, неудачами и так далее в одной единственной таблице.

О НАС:

Bi3 была признана одной из самых быстрорастущих компаний в Австралии. Наша команда реализовала значительные и сложные проекты для крупнейших организаций по всему миру, и мы быстро создаем бренд, хорошо известный благодаря превосходному исполнению.

Веб-сайт: https://bi3technologies.com/

Следуйте за нами:

LinkedIn: https://www.linkedin.com/company/bi3technologies

Instagram: https://www.instagram.com/bi3technologies/

Twitter: https://twitter.com/Bi3Technologies

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?

20.08.2023 18:21

Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в 2023-2024 годах? Или это полная лажа?".

Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией

20.08.2023 17:46

В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.

Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox

19.08.2023 18:39

Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в частности, магию поплавков и гибкость flexbox.

Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest

19.08.2023 17:22

В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для чтения благодаря своей простоте. Кроме того, мы всегда хотим проверить самые последние возможности в наших проектах!

Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️

18.08.2023 20:33

Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий их языку и культуре.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL

14.08.2023 14:49

Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип предназначен для представления неделимого значения.