DBT имеет множество встроенных функций для автоматизации трудоемкой работы. Одной из таких функций является тест DBT.
DBT test будет генерировать единичные тесты на основе файла source.yaml. Предположим, мы хотим добавить проверку NOT NULL к столбцу в исходной таблице, мы просто добавим следующее:
version: 2 sources: - name: source_name database: db_name schema: schema_name tables: - name: table_name columns: - name: column_name tests: - not_null
DBT автоматически сгенерирует тест, который будет выглядеть как...
select column_name from db_name.schema_name.table_name where column_nameis null
Результат этого запроса отображается в консоли, на которой вызывается тест DBT. Чтобы сохранить журналы в базе данных, мы можем добавить флаг - store-failures к команде DBT test, и DBT создаст таблицу в базе данных, к которой мы сможем обратиться позже.
Проблема этого подхода заключается в том, что количество таблиц прямо пропорционально количеству моделей с тестами.
Чтобы преодолеть эту проблему, мы использовали Python. Используя Python, можно получить сингулярную таблицу со следующими колонками: имя модели, имя теста, статус теста, количество отказов и так далее.
Когда DBT выполняет команду DBT test, генерируется файл run_results.json.
{ "status": "error", "timing": [], "thread_id": "Thread-3 (worker)", "execution_time": 3.470108985900879, "adapter_response": {}, "message": "", "failures": null, "unique_id": "test.project.test_name_model_name__column_names" }
Файл run_results.json содержит всю необходимую информацию для анализа с помощью Python и отправки в базу данных, например Snowflake, с помощью пакета snowflake-connector-python.
Зависимости для этого сценария Python следующие:
Сначала загрузите файл run_results.json и файл source.yaml.
from json import loads import snowflake.connector as sf from yaml.loader import SafeLoader
with open(rf'{target_path}/run_results.json', 'r') as input_file: my_json = input_file.read() new_json = loads(my_json) my_tables = [] with open(rf'{dbt_path}/models/source/source.yml') as f: my_source = load(f, Loader=SafeLoader)
for i in (range(len(my_source['sources']))): for j in (range(len(my_source['sources'][i]['tables']))): schema_name = my_source['sources'][i]['name'] table_name = my_source['sources'][i]['tables'][j]['name'] result = table_name my_tables.append(result)
А затем добавить имена тестов. (Можно получить эту информацию из файла source.yaml, но это добавит ненужную сложность в данное конкретное решение).
my_test_names = ['source_not_null','source_table_not_empty', 'not_null', 'relationships']
Логика парсинга:
for i in range(len(new_json['results'])): unique_id = new_json['results'][i]['unique_id'] test_name_model_name_column_name = '' test_name = '' ref_test_name = '' model_name = '' column_name = '' severity = 'error' if unique_id.split('.')[0] == 'test': test_name_model_name_column_name = unique_id.split('.')[2] for test in my_test_names: if test in test_name_model_name_column_name: test_name = test if test_name == 'relationships': test_name_model_name_column_name = test_name_model_name_column_name.replace('relationships', '') ref_test_name = test_name_model_name_column_name.lstrip('_').replace('__', ',').split(',')[-1] test_name_model_name_column_name = test_name_model_name_column_name.replace(ref_test_name, '') test_name_model_name_column_name = test_name_model_name_column_name.replace( test_name, '') for model in my_tables: if model in test_name_model_name_column_name: model_name = model if '__warn' in test_name_model_name_column_name: severity = 'warn' test_name_model_name_column_name = test_name_model_name_column_name.replace( '__warn', '') else: test_name_model_name_column_name = test_name_model_name_column_name.replace( '__error', '') if test_name != 'relationships': column_name = test_name_model_name_column_name.replace(model, '').replace('_amis_', '').lstrip( '_').replace('__', ',').replace('sk', model+'_sk').replace('key', model+'_key') # has to be in this order else: column_name = test_name_model_name_column_name.replace(model, '').lstrip( '_').replace('__', ',') # print(model_name, '-->', column_name) # print(severity) execution_time = new_json['results'][i]['execution_time'] status = new_json['results'][i]['status'] failures = new_json['results'][i]['failures'] message = new_json['results'][i]['message'] values += f""" ( '{'Relationship Integrity test' if ref_test_name != '' else test_name}', '{model_name}', '{column_name if column_name != '' else 'NULL'}', '{status}', '{0 if failures == None else failures}', '{'NULL' if message == None else message.replace("'", '`')}', '{execution_time}', current_timestamp::timestamp_ntz ),"""
И наконец, публикуем результаты в таблицу снежинок.
def insert_values_into_log(): conn = sf.connect(user=user, password=password, account=account, warehouse=warehouse, database=database, authenticator=authenticator) def run_query(conn, query): cursor = conn.cursor() cursor.execute(query) cursor.close() insert_values = f"""insert into DATABASE_NAME.SCHEMA_NAME.TABLE_NAME( test_name, model_name, column_names, test_status, failures, message, test_execution_time, effective_timestamp ) values {values[:-1]}""" run_query(conn, insert_values) print('Success') insert_values_into_log()
Примечание: Не забудьте добавить собственные учетные данные в sf.connect. Также измените в команде insert into параметры DATABASE_NAME.SCHEMA_NAME.TABLE_NAME на те, которые подходят для вашей таблицы.
Полученная таблица будет содержать все тесты с их именами моделей, именами столбцов, статусом, неудачами и так далее в одной единственной таблице.
О НАС:
Bi3 была признана одной из самых быстрорастущих компаний в Австралии. Наша команда реализовала значительные и сложные проекты для крупнейших организаций по всему миру, и мы быстро создаем бренд, хорошо известный благодаря превосходному исполнению.
Веб-сайт: https://bi3technologies.com/
Следуйте за нами:
LinkedIn: https://www.linkedin.com/company/bi3technologies
Instagram: https://www.instagram.com/bi3technologies/
Twitter: https://twitter.com/Bi3Technologies
20.08.2023 18:21
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в 2023-2024 годах? Или это полная лажа?".
20.08.2023 17:46
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
19.08.2023 18:39
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в частности, магию поплавков и гибкость flexbox.
19.08.2023 17:22
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для чтения благодаря своей простоте. Кроме того, мы всегда хотим проверить самые последние возможности в наших проектах!
18.08.2023 20:33
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий их языку и культуре.
14.08.2023 14:49
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип предназначен для представления неделимого значения.