Materialization & Custom Transformations using dbt

Rasiksuhail
7 min readApr 12, 2023
Data Build Tool

dbt (Data Build Tool) empowers data analysts and engineers to build reliable and maintainable data pipelines with ease. With its powerful features and ease of use, dbt is quickly becoming the go-to choice for modern data teams. For the readers who are learning gpt and starting fresh, recommend you to go through the Introductory blog of dbt and its better if you are having some knowledge on macros & ref.

In this blog, lets cover the following topics in dbt

  • Incremental models: Learn how to use incremental models in dbt to make your data pipelines faster and more efficient.
  • Materialization: Learn how to use different types of materializations in dbt to store your data, including views, tables, and temp tables.
  • Cross-database joins: Learn how to join tables from different databases in dbt.
  • Custom transformations: Learn how to create custom SQL transformations in dbt using Jinja templating.
  • Scheduling: Learn how to schedule your dbt models to run automatically using cron or other scheduling tools.
  • Alerting and monitoring : Learn how to monitor the models and send alerts in case of failure or any data issues.

Cool. Lets start

Incremental Models

Incremental models are used to process only the changed data in a table, reducing the time and resources required to run dbt models. This feature is especially useful for tables that receive frequent updates. To use incremental models, you can use the “unique_key” configuration option in the “source” configuration block. You can also use the “strategy: incremental” option in the “materialization” configuration block to specify that the model should be incremental.

Incremental models in dbt allow you to only process data that has changed since the last time the model was run, rather than processing all of the data every time. This can make your data pipelines faster and more efficient.

Here’s an example of an incremental model in dbt:

{% if is_incremental() %}
{% set max_date = max_date_in_source('orders') %}
{% else %}
{% set max_date = '1970-01-01' %}
{% endif %}

SELECT *
FROM orders
WHERE created_at > '{{ max_date }}'

In this example, the is_incremental() function checks if the model is being run incrementally. If it is, the model sets the max_date variable to the maximum date in the orders table in the source database. The query then only selects rows from the orders table that have a created_at date greater than the max_date.

Here is an example of how to set up an incremental model in dbt using the incremental_strategy configuration parameter:

{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='insert',
)
}}

select *
from {{ source('my_source_table') }}
where created_at >= (select max(created_at) from {{ this }})

In this example, we use the incremental_strategy parameter to specify that new data should be inserted into the destination table rather than being deleted and re-inserted. We also use the unique_key parameter to specify the primary key of the table, which is used to identify the changes in the source data.

Materialization

Materialization in dbt refers to how your models are stored in your database. There are different types of materializations available, including views, tables, and temporary tables. Views are virtual tables that do not store data but rather generate data dynamically based on a predefined SQL statement. Tables are physical tables that store data in your database. Temporary tables are tables that are created on the fly and used only for the duration of the session. You can choose the appropriate materialization based on your use case.

Here’s an example of a view materialization in dbt:

{{
config(
materialized='view',
unique_key='id'
)
}}

SELECT *
FROM orders
WHERE status = 'shipped'

In this example, the config block specifies the materialized option as 'view'. The query then selects all rows from the orders table where the status is 'shipped'. This materialized view can then be used as a source for other dbt models.

Cross-database joins

Cross-database joins in dbt allow you to join tables from different databases. This feature is especially useful if you have data stored in multiple databases and need to combine them for analysis. To perform cross-database joins, you need to configure the connection for each database in the dbt_project.yml file and specify the schema and table name in the SQL statement.

Cross-database joins can be done using the dbt_utils.cross_database package.

Here is an example of how to join tables from two different databases using the dbt_utils.cross_database package:

{{
config(
materialized='table',
)
}}

select *
from {{ dbt_utils.cross_database(ref('my_database_1', 'my_table_1'), ref('my_database_2', 'my_table_2')) }}

In this example, we use the dbt_utils.cross_database function to join tables my_table_1 from database my_database_1 and my_table_2 from database my_database_2.

Custom transformations

Custom transformations in dbt allow you to create complex SQL queries using Jinja templating. This feature is especially useful when you need to perform data transformations that cannot be done using dbt’s built-in macros or when you need to perform a sequence of transformations. To use custom transformations, you can create a new SQL file in the “models” directory and use Jinja templating to write the SQL query.

Here’s an example of a custom transformation in dbt:

{% set new_customer_data = run_query("SELECT * FROM new_customers") %}

WITH customer_updates AS (
SELECT *
FROM {{ ref('my_schema.customers') }}
WHERE id IN (
SELECT id
FROM {{ ref('my_schema.customers') }}
WHERE created_at > '{{ yesterday_date }}'
)
)
SELECT *
FROM customer_updates
UNION ALL
SELECT *
FROM new_customer_data

In this example, the run_query() function is used to run a query that selects all new customers from an external source. The query then uses a common table expression (CTE) to select all customer updates from the previous day, and then unions the two sets of data together.

Scheduling

Dbt allows users to schedule their models to run automatically on a specific time or interval using cron or other scheduling tools.

Here is an example of how to schedule a dbt model using cron:

models:
my_model:
enabled: true
materialized: table
cron: "0 * * * *"
sql: select * from {{ source('my_source_table') }}

There are several scheduling tools available that integrate with dbt, such as Airflow and Dagster. These tools allow you to set up more complex scheduling workflows and dependencies between your dbt models.

To schedule your dbt models, you can use the --defer flag when running dbt commands. This tells dbt to defer execution of the command until the scheduled time. For example, you could use the following command to defer the run command until 3am:

dbt run --defer "0 3 * * *"

When setting up a schedule for your dbt models, it’s important to consider the dependencies between your models. You may need to set up a schedule that ensures that upstream models are always run before downstream models.

Alerting and Monitoring

Alerting and monitoring are crucial aspects of any data pipeline. In dbt, there are a few ways you can monitor and alert on your pipeline’s health and performance.

One of the most common ways to monitor dbt is through its built-in logs. dbt logs a wealth of information, including how long each model takes to run, any errors or warnings that occur during the build, and more. By monitoring the dbt logs, you can get a sense of how your pipeline is performing over time and catch any issues before they become significant problems.

Here’s an example of how you can use dbt to set up alerts and monitoring:

Let’s say we want to monitor the number of failed transactions in a particular table in our data warehouse. We can set up an alert to notify us if the number of failed transactions exceeds a certain threshold.

First, we’ll create a custom schema in our project for our alerts. In this case, let’s call it alert.

# alerts/schema.yml

version: 2
schemas:
- name: alert
description: "Custom schema for alerts and monitoring."
tables:
- name: failed_transactions
description: "Alert for failed transactions in my_table."
columns:
- name: num_failed_transactions
tests:
- not_null
- bounds:
max: 10

In this schema, we’ve defined a failed_transactions table with a single column num_failed_transactions. We've also added two tests to this column: not_null and bounds. The bounds test checks that the value of num_failed_transactions does not exceed a maximum value of 10.

Next, we’ll create a custom model that queries the data we want to monitor and outputs the result to the failed_transactions table in our alert schema.

-- models/alert/failed_transactions.sql
{{
config(
materialized='table',
schema='alert',
unique_key='1'
)
}}
SELECT COUNT(*) AS num_failed_transactions
FROM my_table
WHERE status = 'failed'

In this model, we’re counting the number of failed transactions in my_table and outputting the result to the num_failed_transactions column in our failed_transactions table in the alert schema.

Finally, we’ll create a custom test that checks the value of num_failed_transactions against our specified threshold.

-- tests/alert/failed_transactions_test.sql
{{
config(
schema='alert',
not_null='num_failed_transactions'
)
}}
{% set max_failed_transactions = 10 %}
SELECT *
FROM {{ this }}
WHERE num_failed_transactions > {{ max_failed_transactions }}

In this test, we’re checking that the value of num_failed_transactions in our failed_transactions table is not null, and also that it does not exceed our specified threshold of 10.

With these components in place, we can now set up an alert to notify us if the value of num_failed_transactions exceeds our threshold. For example, we can use a tool like Slack or PagerDuty to send a notification to a specific channel or team member.

# alerts/failed_transactions_alerts.yml
version: 2

alerts:
- name: failed_transactions_alert
description: "Alert if the number of failed transactions exceeds 10."
models: [alert.failed_transactions]
severity: warning
notify:
- email:
on: always
to: myteam@mycompany.com
- slack:
on: always
channel: #my-alerts
when:
- operator: gt
threshold: 10
column: num_failed_transactions

In this alert configuration, we’re defining an alert named failed_transactions_alert that will monitor the num_failed_transactions column in our failed_transactions table. If the value of this column exceeds 10, we'll receive a notification via email and Slack.

By leveraging the power of dbt’s testing and notification features, you can proactively monitor your data pipelines and quickly identify and rectify the issues using dbt.

Hurray !! We have learnt few advanced topics related to dbt

Lets dive more into dbt in the upcoming blogs. Watch Out.

-Happy dbt !

Do checkout my other blogs on dbt to get started
https://medium.com/@rasiksuhail/introduction-to-dbt-step-by-step-instructions-for-beginners-a16d343c8826

https://medium.com/@rasiksuhail/exploring-about-macros-ref-in-dbt-11bce7448c92

--

--