CodeX
Published in

CodeX

Auto Model Generating for DBT

Simple CLI for Unnesting Staging Data (Redshift)

Photo by Lukas on Unsplash

One of My Daily Tasks (Sometimes 😋)

DBT Model Generator

├── helpers
│ ├── connection.py
│ ├── __init__.py
│ ├── parser.py
│ └── writer.py
├── README.md
├── requirements.txt
└── script.py
pip install black SQLAlchemy sqlalchemy-redshift psycopg2-binary

Connection Module

# helpers/connection.py

from sqlalchemy import create_engine


def get_engine():
"""Returns Redshift engine

Returns:
engine: Redshift connected engine
"""
return create_engine(
"redshift+psycopg2://<YOUR_URI>" # ofcourse you can use env variables
)

Parser Module

# helpers/parser.py

import json
from datetime import datetime


class TypeChecker(object):
@staticmethod
def is_timestamp(val: str) -> bool:
try:
datetime.fromisoformat(val)
return True
except:
return False

@staticmethod
def is_integer(val: str) -> bool:
try:
int(val)
return True
except:
return False

@staticmethod
def is_decimal(val: str) -> bool:
try:
float(val)
return True
except:
return False

@staticmethod
def is_varchar(val: str) -> bool:
try:
str(val)
return True
except:
return False


def get_data_type(col_value: str) -> str:
if TypeChecker.is_timestamp(col_value):
return "timestamp"
elif TypeChecker.is_integer(col_value):
return "integer"
elif TypeChecker.is_decimal(col_value):
return "decimal(16,2)"
elif (
TypeChecker.is_varchar(col_value) == False
): # if it's not varchar, it has to be examined
return "UNDEFINED"
else:
return "varchar"


def get_parsed_data(res: str) -> dict:
"""Return passed data columns' types

Args:
res (str): JSON format data in STR!

Returns:
column_types (dict): columns are keys and data types are values
"""
res = json.loads(res)

column_types = {}

for col in res:
column_types[col] = get_data_type(res[col])

return column_types

Writer Module

# helpers/writer.py

def get_dbt_formatted_sql(data: dict, json_col_id: str) -> str:
rows = [
f'{json_col_id}."{col}"::{data[col]} as {col}'
for col in data
if col != "_extract_timestamp"
]
output = ",\n".join(rows)
output += f',\n{json_col_id}."_extract_timestamp"::timestamp as _extract_timestamp'
return output

Main Script

  • Select first row from the table
  • Parse its column names and data types
  • Generate dbt model output
  • Write the output into a text file
# ./script.py

import sys
from helpers.connection import get_engine
from helpers.parser import get_parsed_data
from helpers.writer import get_dbt_formatted_sql

SCHEMA_PREFIX = "my_aws_schema"
JSON_PREFIX = "json_identifier"


def main():
global SCHEMA_PREFIX
global JSON_PREFIX

table = sys.argv[1]

try:
SCHEMA_PREFIX = sys.argv[2] # if 2nd argument is passed, it is schema!
except:
print(f"pre-defined schema is => {SCHEMA_PREFIX}")

try:
SCHEMA_PREFIX = sys.argv[
3
] # if 3rd argument is passed, it is JSON column identifier!
except:
print(f"pre-defined JSON identifier is => {JSON_PREFIX}")

engine = get_engine()

res = engine.execute(f"select * from {SCHEMA_PREFIX}.{table} limit 1;").fetchone()[
0
]

column_types = get_parsed_data(res)

with open("raw_data.json", "w+", encoding="utf-8") as file:
file.write(str(res))

with open("raw_data_column_types.json", "w+", encoding="utf-8") as file:
file.write(str(column_types))

with open("dbt_model.txt", "w+", encoding="utf-8") as file:
model_txt = get_dbt_formatted_sql(column_types, JSON_PREFIX)
file.write(model_txt)

print("Successfully completed!")
if __name__ == "__main__":
main()
nested_identifier."col1"::timestamp as col1,
nested_identifier."col2"::varchar as col2,
nested_identifier."col3"::timestamp as col3,
nested_identifier."_extract_timestamp"::timestamp as _extract_timestamp # col4

Finally

--

--

Everything connected with Tech & Code. Follow to join our 1M+ monthly readers

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Baysan

Lifelong learner & Freelancer. I use technology that helps me. I’m currently working as a Business Intelligence & Backend Developer. mebaysan.com