Converting binary md5 hashing into varchar md5 hashing in DBT

Sakthivel
BI3 Technologies
Published in
6 min readJan 24, 2023

This blog is about custom macros for changing the behaviour of md5 hashing. DBT Vault hash columns use MD5 binary by default. To alter this behaviour, internal changes are done to DBT Vault such that MD5 varchar was used in place of MD5 binary.

The macros are attached below :

Step 1: Attach these three SQL in dbt macro folder.

  1. stage.sql

The hash model uses a macro instead of a built-in macro while running. A stage macro will run a hash_columns.sql macro after obtaining all the data from the hash stage model.

{%- macro stage(include_source_columns=none, source_model=none, hashed_columns=none, derived_columns=none, null_columns=none, ranked_columns=none) -%}

{%- if include_source_columns is none -%}
{%- set include_source_columns = true -%}
{%- endif -%}

{{- dbtvault.prepend_generated_by() }}

{{ adapter.dispatch('stage')(include_source_columns=include_source_columns,
source_model=source_model,
hashed_columns=hashed_columns,
derived_columns=derived_columns,
null_columns=null_columns,
ranked_columns=ranked_columns
) -}}
{%- endmacro -%}

{%- macro default__stage(include_source_columns, source_model, hashed_columns, derived_columns, null_columns, ranked_columns) -%}

{% if (source_model is none) and execute %}

{%- set error_message -%}
Staging error: Missing source_model configuration. A source model name must be provided.
e.g.
[REF STYLE]
source_model: model_name
OR
[SOURCES STYLE]
source_model:
source_name: source_table_name
{%- endset -%}

{{- exceptions.raise_compiler_error(error_message) -}}
{%- endif -%}

{#- Check for source format or ref format and create
relation object from source_model -#}
{% if source_model is mapping and source_model is not none -%}

{%- set source_name = source_model | first -%}
{%- set source_table_name = source_model[source_name] -%}

{%- set source_relation = source(source_name, source_table_name) -%}
{%- set all_source_columns = dbtvault.source_columns(source_relation=source_relation) -%}
{%- elif source_model is not mapping and source_model is not none -%}

{%- set source_relation = ref(source_model) -%}
{%- set all_source_columns = dbtvault.source_columns(source_relation=source_relation) -%}
{%- else -%}

{%- set all_source_columns = [] -%}
{%- endif -%}

{%- set derived_column_names = dbtvault.extract_column_names(derived_columns) | map('upper') | list -%}
{%- set null_column_names = dbtvault.extract_null_column_names(null_columns) | map('upper') | list -%}
{%- set hashed_column_names = dbtvault.extract_column_names(hashed_columns) | map('upper') | list -%}
{%- set ranked_column_names = dbtvault.extract_column_names(ranked_columns) | map('upper') | list -%}
{%- set exclude_column_names = derived_column_names + null_column_names + hashed_column_names | map('upper') | list -%}
{%- set source_and_derived_column_names = (all_source_columns + derived_column_names) | map('upper') | unique | list -%}

{%- set source_columns_to_select = dbtvault.process_columns_to_select(all_source_columns, exclude_column_names) -%}
{%- set derived_columns_to_select = dbtvault.process_columns_to_select(source_and_derived_column_names, null_column_names + hashed_column_names) | unique | list -%}
{%- set derived_and_null_columns_to_select = dbtvault.process_columns_to_select(source_and_derived_column_names + null_column_names, hashed_column_names) | unique | list -%}
{%- set final_columns_to_select = [] -%}

{#- Include source columns in final column selection if true -#}
{%- if include_source_columns -%}
{%- if dbtvault.is_nothing(derived_columns)
and dbtvault.is_nothing(null_columns)
and dbtvault.is_nothing(hashed_columns)
and dbtvault.is_nothing(ranked_columns) -%}
{%- set final_columns_to_select = final_columns_to_select + all_source_columns -%}
{%- else -%}
{#- Only include non-overriden columns if not just source columns -#}
{%- set final_columns_to_select = final_columns_to_select + source_columns_to_select -%}
{%- endif -%}
{%- endif %}

WITH source_data AS (

SELECT

{{- "\n\n " ~ dbtvault.print_list(dbtvault.escape_column_names(all_source_columns)) if all_source_columns else " *" }}

FROM {{ source_relation }}
{%- set last_cte = "source_data" %}
)

{%- if dbtvault.is_something(derived_columns) -%},

derived_columns AS (

SELECT

{{ dbtvault.derive_columns(source_relation=source_relation, columns=derived_columns) | indent(4) }}

FROM {{ last_cte }}
{%- set last_cte = "derived_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + derived_column_names %}
)
{%- endif -%}

{% if dbtvault.is_something(null_columns) -%},

null_columns AS (

SELECT

{{ dbtvault.print_list(dbtvault.escape_column_names(derived_columns_to_select)) }}{{"," if dbtvault.is_something(derived_columns_to_select) else ""}}

{{ dbtvault.null_columns(source_relation=none, columns=null_columns) | indent(4) }}

FROM {{ last_cte }}
{%- set last_cte = "null_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + null_column_names %}
)
{%- endif -%}


{% if dbtvault.is_something(hashed_columns) -%},

hashed_columns AS (

SELECT

{{ dbtvault.print_list(dbtvault.escape_column_names(derived_and_null_columns_to_select)) }},

{% set processed_hash_columns = dbtvault.process_hash_column_excludes(hashed_columns, all_source_columns) -%}
{{ hash_columns(columns=processed_hash_columns) | indent(4) }}

FROM {{ last_cte }}
{%- set last_cte = "hashed_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + hashed_column_names %}
)
{%- endif -%}

{% if dbtvault.is_something(ranked_columns) -%},

ranked_columns AS (

SELECT *,

{{ dbtvault.rank_columns(columns=ranked_columns) | indent(4) if dbtvault.is_something(ranked_columns) }}

FROM {{ last_cte }}
{%- set last_cte = "ranked_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + ranked_column_names %}
)
{%- endif -%}

,

columns_to_select AS (

SELECT

{{ dbtvault.print_list(dbtvault.escape_column_names(final_columns_to_select | unique | list)) }}

FROM {{ last_cte }}
)

SELECT * FROM columns_to_select

{%- endmacro -%}

Step 2: Check the folder name as the same as given.

2. hash_columns.sql

Data from stage.sql will be sent to the hash_columns.sql macro after its run, and the macro will split the column into a list that will be passed to the hash. sql macro

{%- macro hash_columns(columns=none) -%}

{{- adapter.dispatch('hash_columns')(columns=columns) -}}

{%- endmacro %}

{%- macro default__hash_columns(columns=none) -%}

{%- if columns is mapping and columns is not none -%}

{%- for col in columns -%}

{% if columns[col] is mapping and columns[col].is_hashdiff -%}

{{- hash_diff(columns=columns[col]['columns'],
alias=col,
is_hashdiff=columns[col]['is_hashdiff']) -}}

{%- elif columns[col] is not mapping -%}

{{- hash_diff(columns=columns[col],
alias=col,
is_hashdiff=false) -}}

{%- elif columns[col] is mapping and not columns[col].is_hashdiff -%}

{%- if execute -%}
{%- do exceptions.warn("[" ~ this ~ "] Warning: You provided a list of columns under a 'columns' key, but did not provide the 'is_hashdiff' flag. Use list syntax for PKs.") -%}
{% endif %}

{{- hash_diff(columns=columns[col]['columns'], alias=col) -}}

{%- endif -%}

{{- ",\n" if not loop.last -}}
{%- endfor -%}

{%- endif %}
{%- endmacro -%}

3. hash.sql

This macro will convert all data into hashed one.

{%- macro hash_diff(columns=none, alias=none, is_hashdiff=false) -%}
{%- if is_hashdiff is none -%}
{%- set is_hashdiff = false -%}
{%- endif -%}
{{- adapter.dispatch('hash_diff')(columns=columns, alias=alias, is_hashdiff=is_hashdiff) -}}
{%- endmacro %}

{%- macro default__hash_diff(columns, alias, is_hashdiff) -%}
{%- set hash = var('hash', 'MD5') -%}
{%- set concat_string = var('concat_string', '||') -%}
{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%}
{#- Select hashing algorithm -#}
{%- if hash == 'MD5' -%}
{%- set hash_alg = 'MD5' -%}
{%- set hash_size = 200 -%}
{%- elif hash == 'SHA' -%}
{%- set hash_alg = 'SHA2' -%}
{%- set hash_size = 256 -%}
{%- else -%}
{%- set hash_alg = 'MD5' -%}
{%- set hash_size = 200 -%}
{%- endif -%}
{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR))), '')" %}
{#- Alpha sort columns before hashing if a hashdiff -#}
{%- if is_hashdiff and dbtvault.is_list(columns) -%}
{%- set columns = columns|sort -%}
{%- endif -%}
{#- If single column to hash -#}
{%- if columns is string -%}
{%- set column_str = dbtvault.as_constant(columns) -%}
{%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%}
{{- "CAST(({}({})) AS VARCHAR({})) AS {}".format(hash_alg, standardise | replace('[EXPRESSION]', escaped_column_str), hash_size, dbtvault.escape_column_names(alias)) | indent(4) -}}
{#- Else a list of columns to hash -#}
{%- else -%}
{%- set all_null = [] -%}
{%- if is_hashdiff -%}
{{- "CAST({}(CONCAT_WS('{}',".format(hash_alg, concat_string) | indent(4) -}}
{%- else -%}
{{- "CAST({}(NULLIF(CONCAT_WS('{}',".format(hash_alg, concat_string) | indent(4) -}}
{%- endif -%}
{%- for column in columns -%}
{%- do all_null.append(null_placeholder_string) -%}
{%- set column_str = dbtvault.as_constant(column) -%}
{%- set escaped_column_str = dbtvault.escape_column_names(column_str) -%}
{{- "\nIFNULL({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}}
{{- "," if not loop.last -}}
{%- if loop.last -%}
{% if is_hashdiff %}
{{- "\n)) AS VARCHAR({})) AS {}".format(hash_size, dbtvault.escape_column_names(alias)) -}}
{%- else -%}
{{- "\n), '{}')) AS VARCHAR({})) AS {}".format(all_null | join(""), hash_size, dbtvault.escape_column_names(alias)) -}}
{%- endif -%}
{%- else -%}
{%- do all_null.append(concat_string) -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}
{%- endmacro -%}

Step 3: Call this hash stage instated of dbt vault function.

The sample is given below:

{%- set yaml_metadata -%}
source_model: source_stage
hashed_columns:
HUB_COLUMNS:
- "ID"
- "NAME"
- "AGE"

{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{ stage(include_source_columns=true,
source_model=metadata_dict['source_model'],
derived_columns=none,
null_columns=none,
hashed_columns=metadata_dict['hashed_columns'],
ranked_columns=none) }}
before
Before adding custom macros

Result:

After adding custom macros

As per the requirement, changed the data type of hashed column binary (16) into the varchar (200) data type.

About Us :

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has delivered substantial and complex projects for some of the largest organizations around the globe and we’re quickly building a brand that is well-known for superior delivery.

Website: https://bi3technologies.com/

Follow us on,
LinkedIn: https://www.linkedin.com/company/bi3technologies
Instagram:
https://www.instagram.com/bi3technologies/
Twitter:
https://twitter.com/Bi3Technologies

--

--