How to log all DBT test results into a centralized snowflake table

Mohammad Rohan
BI3 Technologies
Published in
3 min readApr 12, 2023
dbt — > python — > snowflake

DBT has many inbuilt features for automating time-consuming work. One of those features is the DBT test.

DBT test will generate singular tests based on the source.yaml file. Suppose we want to add a NOT NULL check to a column in the source table, we would simply add the following:

version: 2
sources:
- name: source_name
database: db_name
schema: schema_name
tables:
- name: table_name
columns:
- name: column_name
tests:
- not_null

DBT will automatically generate a test which looks like…

select column_name
from db_name.schema_name.table_name
where column_nameis null

The result of this query is displayed in the console where the DBT test is invoked. To save the logs in a database, we can add a — store-failures flag to the DBT test command, and DBT will create a table in the database where we can query the table later.

The problem with this approach is that the number of tables is directly proportional to the number of models with tests.

To overcome this problem, we used Python. Using Python, it is possible to get a singular table with the following columns: model name, test name, status of the test, number of failures, and so on.

When DBT executes a DBT test command, a run_results.json file is generated.

{
"status": "error",
"timing": [],
"thread_id": "Thread-3 (worker)",
"execution_time": 3.470108985900879,
"adapter_response": {},
"message": "",
"failures": null,
"unique_id": "test.project.test_name_model_name__column_names"
}

Implementation

This run_results.json file contains all the necessary information to parse using Python and post to a database such as Snowflake using the snowflake-connector-python package.

The dependencies for this Python script are:

  1. snowflake-connector-python==2.8.2
  2. PyYAML==6.0.0

To first load the run_results.json file and source.yaml file.

  from json import loads
import snowflake.connector as sf
from yaml.loader import SafeLoader
with open(rf'{target_path}/run_results.json', 'r') as input_file:
my_json = input_file.read()

new_json = loads(my_json)

my_tables = []
with open(rf'{dbt_path}/models/source/source.yml') as f:
my_source = load(f, Loader=SafeLoader)
for i in (range(len(my_source['sources']))):
for j in (range(len(my_source['sources'][i]['tables']))):
schema_name = my_source['sources'][i]['name']
table_name = my_source['sources'][i]['tables'][j]['name']
result = table_name
my_tables.append(result)

And then adding the names of the tests. (It is possible to fetch this information for the source.yaml file, but it’ll add unnecessary complexity to this particular solution.)

my_test_names = ['source_not_null','source_table_not_empty', 'not_null', 'relationships']

The parsing logic:

for i in range(len(new_json['results'])):
unique_id = new_json['results'][i]['unique_id']
test_name_model_name_column_name = ''
test_name = ''
ref_test_name = ''
model_name = ''
column_name = ''
severity = 'error'
if unique_id.split('.')[0] == 'test':
test_name_model_name_column_name = unique_id.split('.')[2]
for test in my_test_names:
if test in test_name_model_name_column_name:
test_name = test
if test_name == 'relationships':
test_name_model_name_column_name = test_name_model_name_column_name.replace('relationships', '')
ref_test_name = test_name_model_name_column_name.lstrip('_').replace('__', ',').split(',')[-1]
test_name_model_name_column_name = test_name_model_name_column_name.replace(ref_test_name, '')
test_name_model_name_column_name = test_name_model_name_column_name.replace(
test_name, '')

for model in my_tables:
if model in test_name_model_name_column_name:
model_name = model
if '__warn' in test_name_model_name_column_name:
severity = 'warn'
test_name_model_name_column_name = test_name_model_name_column_name.replace(
'__warn', '')
else:
test_name_model_name_column_name = test_name_model_name_column_name.replace(
'__error', '')

if test_name != 'relationships':
column_name = test_name_model_name_column_name.replace(model, '').replace('_amis_', '').lstrip(
'_').replace('__', ',').replace('sk', model+'_sk').replace('key', model+'_key') # has to be in this order
else:
column_name = test_name_model_name_column_name.replace(model, '').lstrip(
'_').replace('__', ',')

# print(model_name, '-->', column_name)
# print(severity)
execution_time = new_json['results'][i]['execution_time']
status = new_json['results'][i]['status']
failures = new_json['results'][i]['failures']
message = new_json['results'][i]['message']

values += f"""
(
'{'Relationship Integrity test' if ref_test_name != '' else test_name}',
'{model_name}',
'{column_name if column_name != '' else 'NULL'}',
'{status}',
'{0 if failures == None else failures}',
'{'NULL' if message == None else message.replace("'", '`')}',
'{execution_time}',
current_timestamp::timestamp_ntz
),"""

And finally, post the results to a snowflake table.

    def insert_values_into_log():
conn = sf.connect(user=user, password=password, account=account,
warehouse=warehouse, database=database, authenticator=authenticator)

def run_query(conn, query):
cursor = conn.cursor()
cursor.execute(query)
cursor.close()
insert_values = f"""insert into DATABASE_NAME.SCHEMA_NAME.TABLE_NAME(
test_name,
model_name,
column_names,
test_status,
failures,
message,
test_execution_time,
effective_timestamp
)
values {values[:-1]}"""
run_query(conn, insert_values)
print('Success')

insert_values_into_log()

Note: Don’t forget to add your own credentials in sf.connect. Also change the insert into command’s DATABASE_NAME.SCHEMA_NAME.TABLE_NAME to whatever is appropriate for your table.

The resulting table will contain all the tests with their model names, column names, status, failures, and so on in one singular table.

ABOUT US:

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has delivered substantial and complex projects for some of the largest organizations around the globe, and we’re quickly building a brand that is well-known for superior delivery.

Website: https://bi3technologies.com/

Follow us:
LinkedIn: https://www.linkedin.com/company/bi3technologies
Instagram: https://www.instagram.com/bi3technologies/
Twitter: https://twitter.com/Bi3Technologies

--

--