Auto Model Generating for DBT
Simple CLI for Unnesting Staging Data (Redshift)
In this text we are going to see how we can automate our dbt model generation steps by using Python.
I’ve been studying about microservices and Kubernetes for almost 2 weeks. Therefore, I couldn’t invert new things to write 🫠 Actually, I’ve implemented some new tech stacks into the projects I am working on. To make you excited for the future, I’d say that we’ll see how to implement OpenSearch our projects to create our own search services. 🥸
One of My Daily Tasks (Sometimes 😋)
I work in a consultancy company that we develop an IT infra project for one of our customers. The customer’s data team needs to update the staging tables and data marts to improve their reports. To update the related tables, I need to check nested data structure (in JSON form) and update the added or removed columns in my dbt models. It wasn’t hurting me until a table appeared with almost 75 columns. 🤯 That time I coded a simple Python cli instead of the time probably I’d spend to convert 75 columns row by row. I’ve published the repo on our VCS with the slogan below.
I created this script because I am too lazy to write row by row :&
DBT Model Generator
Our folder tree will be like the below.
├── helpers
│ ├── connection.py
│ ├── __init__.py
│ ├── parser.py
│ └── writer.py
├── README.md
├── requirements.txt
└── script.py
You can install the required packages by using the code below.
pip install black SQLAlchemy sqlalchemy-redshift psycopg2-binary
Connection Module
We can easily get database connections by using the function below. This function returns us the database connection. We’ll use the con
object to execute sql commands.
# helpers/connection.py
from sqlalchemy import create_engine
def get_engine():
"""Returns Redshift engine
Returns:
engine: Redshift connected engine
"""
return create_engine(
"redshift+psycopg2://<YOUR_URI>" # ofcourse you can use env variables
)
Parser Module
Parser module helps us to parse each column and its type (for Redshift).
# helpers/parser.py
import json
from datetime import datetime
class TypeChecker(object):
@staticmethod
def is_timestamp(val: str) -> bool:
try:
datetime.fromisoformat(val)
return True
except:
return False
@staticmethod
def is_integer(val: str) -> bool:
try:
int(val)
return True
except:
return False
@staticmethod
def is_decimal(val: str) -> bool:
try:
float(val)
return True
except:
return False
@staticmethod
def is_varchar(val: str) -> bool:
try:
str(val)
return True
except:
return False
def get_data_type(col_value: str) -> str:
if TypeChecker.is_timestamp(col_value):
return "timestamp"
elif TypeChecker.is_integer(col_value):
return "integer"
elif TypeChecker.is_decimal(col_value):
return "decimal(16,2)"
elif (
TypeChecker.is_varchar(col_value) == False
): # if it's not varchar, it has to be examined
return "UNDEFINED"
else:
return "varchar"
def get_parsed_data(res: str) -> dict:
"""Return passed data columns' types
Args:
res (str): JSON format data in STR!
Returns:
column_types (dict): columns are keys and data types are values
"""
res = json.loads(res)
column_types = {}
for col in res:
column_types[col] = get_data_type(res[col])
return column_types
Writer Module
This module helps us to generate our dbt model structure (string) like the model below.
json_identifier."col_name"::col_type as col
# helpers/writer.py
def get_dbt_formatted_sql(data: dict, json_col_id: str) -> str:
rows = [
f'{json_col_id}."{col}"::{data[col]} as {col}'
for col in data
if col != "_extract_timestamp"
]
output = ",\n".join(rows)
output += f',\n{json_col_id}."_extract_timestamp"::timestamp as _extract_timestamp'
return output
Main Script
We can directly run this script to generate dbt models for our nested data tables. Script does the following.
- Select first row from the table
- Parse its column names and data types
- Generate dbt model output
- Write the output into a text file
# ./script.py
import sys
from helpers.connection import get_engine
from helpers.parser import get_parsed_data
from helpers.writer import get_dbt_formatted_sql
SCHEMA_PREFIX = "my_aws_schema"
JSON_PREFIX = "json_identifier"
def main():
global SCHEMA_PREFIX
global JSON_PREFIX
table = sys.argv[1]
try:
SCHEMA_PREFIX = sys.argv[2] # if 2nd argument is passed, it is schema!
except:
print(f"pre-defined schema is => {SCHEMA_PREFIX}")
try:
SCHEMA_PREFIX = sys.argv[
3
] # if 3rd argument is passed, it is JSON column identifier!
except:
print(f"pre-defined JSON identifier is => {JSON_PREFIX}")
engine = get_engine()
res = engine.execute(f"select * from {SCHEMA_PREFIX}.{table} limit 1;").fetchone()[
0
]
column_types = get_parsed_data(res)
with open("raw_data.json", "w+", encoding="utf-8") as file:
file.write(str(res))
with open("raw_data_column_types.json", "w+", encoding="utf-8") as file:
file.write(str(column_types))
with open("dbt_model.txt", "w+", encoding="utf-8") as file:
model_txt = get_dbt_formatted_sql(column_types, JSON_PREFIX)
file.write(model_txt)
print("Successfully completed!")
if __name__ == "__main__":
main()
We can run the script like below.
python script.py table_name schema_name nested_identifier
It’ll generate an output file that is dbt_model.txt
named.
nested_identifier."col1"::timestamp as col1,
nested_identifier."col2"::varchar as col2,
nested_identifier."col3"::timestamp as col3,
nested_identifier."_extract_timestamp"::timestamp as _extract_timestamp # col4
Now, you just need to copy and paste the output into your select statements. Obviously, you can improve the script to generate also select statements. I leave it to you 🥲
Finally
Hopefully, it helps you to have an idea to generate auto dbt models and keep you from to unnesting your data row by row.
Kind regards