Auto Model Generating for DBT

Simple CLI for Unnesting Staging Data (Redshift)

Baysan
CodeX
4 min readDec 26, 2022

--

In this text we are going to see how we can automate our dbt model generation steps by using Python.

I’ve been studying about microservices and Kubernetes for almost 2 weeks. Therefore, I couldn’t invert new things to write 🫠 Actually, I’ve implemented some new tech stacks into the projects I am working on. To make you excited for the future, I’d say that we’ll see how to implement OpenSearch our projects to create our own search services. 🥸

Photo by Lukas on Unsplash

One of My Daily Tasks (Sometimes 😋)

I work in a consultancy company that we develop an IT infra project for one of our customers. The customer’s data team needs to update the staging tables and data marts to improve their reports. To update the related tables, I need to check nested data structure (in JSON form) and update the added or removed columns in my dbt models. It wasn’t hurting me until a table appeared with almost 75 columns. 🤯 That time I coded a simple Python cli instead of the time probably I’d spend to convert 75 columns row by row. I’ve published the repo on our VCS with the slogan below.

I created this script because I am too lazy to write row by row :&

DBT Model Generator

Our folder tree will be like the below.

├── helpers
│ ├── connection.py
│ ├── __init__.py
│ ├── parser.py
│ └── writer.py
├── README.md
├── requirements.txt
└── script.py

You can install the required packages by using the code below.

pip install black SQLAlchemy sqlalchemy-redshift psycopg2-binary

Connection Module

We can easily get database connections by using the function below. This function returns us the database connection. We’ll use the con object to execute sql commands.

# helpers/connection.py

from sqlalchemy import create_engine


def get_engine():
"""Returns Redshift engine

Returns:
engine: Redshift connected engine
"""
return create_engine(
"redshift+psycopg2://<YOUR_URI>" # ofcourse you can use env variables
)

Parser Module

Parser module helps us to parse each column and its type (for Redshift).

# helpers/parser.py

import json
from datetime import datetime


class TypeChecker(object):
@staticmethod
def is_timestamp(val: str) -> bool:
try:
datetime.fromisoformat(val)
return True
except:
return False

@staticmethod
def is_integer(val: str) -> bool:
try:
int(val)
return True
except:
return False

@staticmethod
def is_decimal(val: str) -> bool:
try:
float(val)
return True
except:
return False

@staticmethod
def is_varchar(val: str) -> bool:
try:
str(val)
return True
except:
return False


def get_data_type(col_value: str) -> str:
if TypeChecker.is_timestamp(col_value):
return "timestamp"
elif TypeChecker.is_integer(col_value):
return "integer"
elif TypeChecker.is_decimal(col_value):
return "decimal(16,2)"
elif (
TypeChecker.is_varchar(col_value) == False
): # if it's not varchar, it has to be examined
return "UNDEFINED"
else:
return "varchar"


def get_parsed_data(res: str) -> dict:
"""Return passed data columns' types

Args:
res (str): JSON format data in STR!

Returns:
column_types (dict): columns are keys and data types are values
"""
res = json.loads(res)

column_types = {}

for col in res:
column_types[col] = get_data_type(res[col])

return column_types

Writer Module

This module helps us to generate our dbt model structure (string) like the model below.

json_identifier."col_name"::col_type as col

# helpers/writer.py

def get_dbt_formatted_sql(data: dict, json_col_id: str) -> str:
rows = [
f'{json_col_id}."{col}"::{data[col]} as {col}'
for col in data
if col != "_extract_timestamp"
]
output = ",\n".join(rows)
output += f',\n{json_col_id}."_extract_timestamp"::timestamp as _extract_timestamp'
return output

Main Script

We can directly run this script to generate dbt models for our nested data tables. Script does the following.

  • Select first row from the table
  • Parse its column names and data types
  • Generate dbt model output
  • Write the output into a text file
# ./script.py

import sys
from helpers.connection import get_engine
from helpers.parser import get_parsed_data
from helpers.writer import get_dbt_formatted_sql

SCHEMA_PREFIX = "my_aws_schema"
JSON_PREFIX = "json_identifier"


def main():
global SCHEMA_PREFIX
global JSON_PREFIX

table = sys.argv[1]

try:
SCHEMA_PREFIX = sys.argv[2] # if 2nd argument is passed, it is schema!
except:
print(f"pre-defined schema is => {SCHEMA_PREFIX}")

try:
SCHEMA_PREFIX = sys.argv[
3
] # if 3rd argument is passed, it is JSON column identifier!
except:
print(f"pre-defined JSON identifier is => {JSON_PREFIX}")

engine = get_engine()

res = engine.execute(f"select * from {SCHEMA_PREFIX}.{table} limit 1;").fetchone()[
0
]

column_types = get_parsed_data(res)

with open("raw_data.json", "w+", encoding="utf-8") as file:
file.write(str(res))

with open("raw_data_column_types.json", "w+", encoding="utf-8") as file:
file.write(str(column_types))

with open("dbt_model.txt", "w+", encoding="utf-8") as file:
model_txt = get_dbt_formatted_sql(column_types, JSON_PREFIX)
file.write(model_txt)

print("Successfully completed!")
if __name__ == "__main__":
main()

We can run the script like below.

python script.py table_name schema_name nested_identifier

It’ll generate an output file that is dbt_model.txtnamed.

nested_identifier."col1"::timestamp as col1,
nested_identifier."col2"::varchar as col2,
nested_identifier."col3"::timestamp as col3,
nested_identifier."_extract_timestamp"::timestamp as _extract_timestamp # col4

Now, you just need to copy and paste the output into your select statements. Obviously, you can improve the script to generate also select statements. I leave it to you 🥲

Finally

Hopefully, it helps you to have an idea to generate auto dbt models and keep you from to unnesting your data row by row.

Kind regards

--

--

Baysan
CodeX
Writer for

Lifelong learner & Developer. I use technology that helps me. mebaysan.com