DBT on Lakehouse Design Patterns Part 1: Optimized ELT

Databricks SQL SME
DBSQL SME Engineering
13 min readNov 27, 2023
Author Screenshot: DBT and Databricks Optimization Practices

Author: Cody Austin Davis

Intro:

DBT has exploded in popularity in the last few years as a highly extensible, simple, and scalable tool that allows analytics engineers and data warehousing teams to manage complex transformations on their data warehouses with the simple open source library. With this trend, there has been an explosion of DBT models being created and managed, but if not built carefully, this explosion of model creation and simplicity comes with a larger explosion of runaway and hard-to-diagnose data processing costs. This explosion of cost is primarily driven by the data warehousing layer, as DBT’s higher level SQL abstraction pulls the engineer further away from the core of the data warehouse engine, making it easier to not think about HOW the queries are executed and HOW DBT is running things. For some cases (mainly small projects and simple pipelines) this simplicity is the only variable that matters; however, for production scale enterprise use cases, it is crucial to balance DBT’s simple abstraction with an understanding of how to use the underlying warehouse effectively with DBT. The Lakehouse Architecture allows for complete unification of all data workloads while also allowing for a more flexible and cost-effective data transformation platform.

This article will focus on how to set up your DBT models for success on Lakehouse from the start, so you can skip the bloated costs and slow performance as you add more use cases to your DBT projects, allowing you to take advantage of the unification AND cost efficacy of the Lakehouse Architecture. We will follow a use case that any Databricks user can reproduce on their own Lakehouse (simply import the model in the Lakehouse Tackle-box here). In this article, we will cover the following strategies to run an optimized DBT project on Databricks at scale:

Optimized DBT Project Methods

  1. Materializing Tables as DDL statements first if needed (such as Identity columns)
  2. Liquid Clustering our Delta Tables
  3. Using the COPY INTO DBT macro efficiently
  4. Running table optimizations as part of your DBT model: OPTIMIZE, ZORDER, TBL-PROPERTIES, ANALYZE TABLE.
  5. Setting global properties for all tables to make scaling these practices simply
  6. Running incremental models in batch processing models effectively with watermarking and liquid clustering
  7. Using incremental predicates for efficient incremental models

Prerequisites: This article assumes you have an already setup DBT project, are able to connect to a DBSQL Warehouse, and have created your DBT project and schema location.

Let’s start with our project, that can be found at the following Git Rub Repo on our Lakehouse Tacklebox here.

We will create a Medallion Architecture as shown below with Bronze (Staging / Raw Layers), Silver (Clean Data Model Layer), and Gold Stages (Aggregate Layers). We will be re-creating our IoT Data Pipeline that we created in our SCD2 blog here, but this time, we will convert it into an efficient DBT project with the best practices defined in the intro. For this article, we will cover efficiently building the SCD Type 1 version first, and then we will continue SCD Type 2 version in the following article. The high-level workflow looks like this:

DBT Models/Macros in Red with associated Delta Table Results

In the above Architecure, our DBT project will consist of essentially 3 models:

  1. Bronze to Silver — silver_sensors_scd_1 model: This contains prehooks for loading data with COPY INTO, MERGE statements, and post-hook statements to optimize our completed data model. This is where a majority of the data processing logic is.
  2. Gold Hourly Aggregates — gold_hourly_summary_stats_7_day_rolling model: This model creates rolled up hourly aggregates of the larger silver view. This will be a “rolling” 7 day materialized table that is re-created after each run.
  3. Gold Smoothed Aggregates — gold_smoothed_sensors_3_day_rolling: This is the final view that would be shown in reports and will make reporting system simple and effective. It also has the most expensive logic so filtering the data pipeline is key to make this table work well. It can be implemented as a VIEW or as a TABLE that is refreshed each time depending on business requirements.

This seems simple, but if our IoT data is hundreds of millions, billions or even trillions of datapoints, then this becomes a critical optimization problem. Let’s break down each step by layer to implement this pipeline effectively.

Bronze Layer

Our first step involves setting up an efficient and scalable bronze layer to continue to ingest new data into incrementally from a Data Lake raw source. The COPY INTO command on DBSQL Serverless is a highly effective way of incrementally ingesting data from raw data lake sources. Databricks has a native macro that allows for easily running COPY INTO statements, so we will use this and show a few customizations to add for extra control and performance gains.

We will use the bronze layer as a place where new IDENTITY keys are generated using the GENERATED BY DEFAULT capability on Delta tables. We also want to create an “ingest_timestamp” column that we will incrementally cluster on utilizing Liquid Clustering to ensure downstream processes can easily read new data without scanning all historical data loads. To use COPY INTO with these capabilities, we write a macro that creates a table with a DDL statement first. We will then reference this macro as a “pre-hook” in our silver layer. There are many ways to create empty tables in DBT such as writing a “shell” SQL SELECT statement to force DBT to create a table, and this works ok (feels a bit hacky coming from classical data warehousing :)), but in our case, we will simply use a macro. The macro looks like the following:

Step 1 — DDL Macro for COPY INTO with IDENTITY key generation

First, we see that we parameterized our target catalog and location with our project. Next we do 2 things here:

  1. Create an Id column with GENERATED BY DEFAULT AS IDENTITY. This allows us to easily to surrogate key generation in the staging layer for downstream merge statements.
  2. Enable Liquid Clustering by ingest_timestamp. This is crucial for efficient incremental running of the project over time. This ensures that downstream incremental loads do not scan old data it does not need. This is a very commonly overlooked and expensive oversight.

Next, we will implement the COPY INTO macro as part of the dbt-databricks package. Now is the time where we move into the “pre-hook” section of our silver_sensors model. In the “pre-hook” section of our silver model, we have the following steps:

  1. Run our macro create_bronze_sensors_identity_table()
  2. Run databricks_copy_into with a custom SELECT expression, including adding a column ‘ingest_timestamp with the following expression: “now() AS ingest_timestamp”. For our example, and only to show efficient incremental ingestion, we add the ‘force’ = ‘true’ copy option. In production, this would be false to load data incrementally. force essentially is always loading all data in the directory it is pointed to. Either way, the pipeline should still be idempotent because the MERGE handles updates downstream.
  3. Run OPTIMIZE bronze_sensors. This will ensure liquid clustering is complete and files are compacted for efficient reading downstream.
  4. Run ANALYZE TABLE bronze_sensors. This is optional and more important when doing downstream joins on tables. In the case of ingestion, it is not as crucial as long as you CLUSTER BY your ingest_timestamp.

The final result to complete our “Raw to Bronze” layer looks like the following:

Pre Hook — Full Raw to Bronze Layer with COPY INTO using liquid clustering

Silver Layer

We have set up our Bronze table by creating our DDL statement with a custom macro and running the databricks_copy_into macro with our custom sql expression creating an ingest_timestamp. Now let’s look at the rest of the silver sensors model.

The full model config for our silver layer looks like the following:

{{ 
config(
materialized='incremental',
unique_key='Id',
incremental_strategy='merge',
tblproperties={'delta.tuneFileSizesForRewrites': 'true', 'delta.feature.allowColumnDefaults': 'supported', 'delta.columnMapping.mode' : 'name'},
liquid_clustered_by = 'timestamp, id, device_id',
incremental_predicates= ["DBT_INTERNAL_DEST.timestamp > dateadd(day, -7, now())"],
pre_hook=["{{ create_bronze_sensors_identity_table() }}",

"{{ databricks_copy_into(target_table='bronze_sensors',
source='/databricks-datasets/iot-stream/data-device/',
file_format='json',
expression_list = 'id::bigint AS Id, device_id::integer AS device_id, user_id::integer AS user_id, calories_burnt::decimal(10,2) AS calories_burnt, miles_walked::decimal(10,2) AS miles_walked, num_steps::decimal(10,2) AS num_steps, timestamp::timestamp AS timestamp, value AS value, now() AS ingest_timestamp',
copy_options={'force': 'true'}
) }}",

"OPTIMIZE {{target.catalog}}.{{target.schema}}.bronze_sensors",

"ANALYZE TABLE {{target.catalog}}.{{target.schema}}.bronze_sensors COMPUTE STATISTICS FOR ALL COLUMNS"
],
post_hook=[
"OPTIMIZE {{ this }}",
"ANALYZE TABLE {{ this }} COMPUTE STATISTICS FOR ALL COLUMNS;"
]
)
}}

And the actual Bronze → Silver SQL logic for SCD Type 1 looks like the following:



WITH de_dup (
SELECT Id::integer,
device_id::integer,
user_id::integer,
calories_burnt::decimal,
miles_walked::decimal,
num_steps::decimal,
timestamp::timestamp,
value::string,
ingest_timestamp,
ROW_NUMBER() OVER(PARTITION BY device_id, user_id, timestamp ORDER BY ingest_timestamp DESC, timestamp DESC) AS DupRank
FROM {{target.catalog}}.{{target.schema}}.bronze_sensors
-- Add Incremental Processing Macro here
{% if is_incremental() %}

WHERE ingest_timestamp > (SELECT MAX(ingest_timestamp) FROM {{ this }})

{% endif %}
)

SELECT Id, device_id, user_id, calories_burnt, miles_walked, num_steps, timestamp, value, ingest_timestamp
-- optional
/*
sha2(CONCAT(COALESCE(Id, ''), COALESCE(device_id, ''))) AS composite_key -- use this as the key if you have composite key
*/
FROM de_dup
WHERE DupRank = 1

In our config, notice we have the materialized param = ‘incremental’, the incremental_strategy param = ‘merge’, and the unique_key param = ‘Id’. We also have implemented liquid clustering by adding the liquid_clustered_by param = ‘timestamp, id, device_id’. This section optimizes and clusters the target silver table to improve Gold table aggregates downstream performance. The unique key param should be the concatenated and hashed version of all columns needed for your ON statement in a DBSQL MERGE command, as this is what DBT will build behind the scenes. In our case, we stay simple and use the Id column. For the real-world, we also include a commented out example of how to create a composite natural key for the MERGE command.

In addition, we can also add additional predicates to the incremental model (MERGE command) with the incremental_predicates config. This is huge for large tables with frequent merges, which would be the case with large IoT datasets. In out example above, we add a predicate that only allows updates to occur on the large sensors table (that could be billions or trillions of rows) if the update occurred on data in the last 7 days. This can save huge amounts of time and money, so it is a crucial parameter.

Lastly, we create the pre-hook commands and then implement 2 post-hook commands for OPTIMIZE and ANALYZE on the silver table once it is built. This will ensure the table stays healthy and optimal for performant reads downstream using Liquid Clustering. Most operations will cluster synchronously on write, but this post-hook ensures clustering for any operations at any scale (you do not always want to cluster synchronously if you have huge updates and do not have time to wait).

For our silver layer we also added a tblproperties config. This is optional but can be very important to utilize when doing data warehousing on Delta tables. In this model, we added a parameter to our config to tune file sizes to show an example, but this is actually already automated in DBSQL. Next in the project, we will show how to apply tblproperties and other general configs to all models by default. This is key for creating unified Delta tables with a unified and clean config strategy. For data warehousing, the following tblproperties are the most common for performance of incremental models that are highly flexible:

  1. ‘delta.feature.allowColumnDefaults’: ‘supported’ — This property allows us to use generated and default columns on our delta tables.
  2. ‘delta.columnMapping.mode’ : ‘name’ — This property allows users to alter, rename, and reorder columns in existing Delta tables.
  3. ‘delta.enableDeletionVectors’: ‘true’ — This property allows for up to 10x merge performance improvement by writing deletion vectors instead of re-writing entire files under the hood.

In newer Databricks runtimes (and in DBSQL), these are usually enabled by default.

Lastly, in the SQL logic itself, you will notice the is_incremental section that performs the watermarking to automatically read from the bronze table incrementally. Since we clustered our bronze table on ingest_timestamp, this incrementally upserting pipeline will not need to scan the entire history of updates each time it needs to load new data. This is very often overlooked and can waste a TON of money if not implemented well, no matter which system you build DBT models on. You always want your pipelines to only scan the minimal amount of data it needs for the logic. To learn how to check for this in DBSQL, check out the “Query Profile” blog here. All these steps combined make up the final code block:

{{ 
config(
materialized='incremental',
unique_key='Id',
incremental_strategy='merge',
tblproperties={'delta.tuneFileSizesForRewrites': 'true', 'delta.feature.allowColumnDefaults': 'supported', 'delta.columnMapping.mode' : 'name'},
liquid_clustered_by = 'timestamp, id, device_id',
pre_hook=["{{ create_bronze_sensors_identity_table() }}",

"{{ databricks_copy_into(target_table='bronze_sensors',
source='/databricks-datasets/iot-stream/data-device/',
file_format='json',
expression_list = 'id::bigint AS Id, device_id::integer AS device_id, user_id::integer AS user_id, calories_burnt::decimal(10,2) AS calories_burnt, miles_walked::decimal(10,2) AS miles_walked, num_steps::decimal(10,2) AS num_steps, timestamp::timestamp AS timestamp, value AS value, now() AS ingest_timestamp',
copy_options={'force': 'true'}
) }}",

"OPTIMIZE {{target.catalog}}.{{target.schema}}.bronze_sensors",

"ANALYZE TABLE {{target.catalog}}.{{target.schema}}.bronze_sensors COMPUTE STATISTICS FOR ALL COLUMNS"
],
post_hook=[
"OPTIMIZE {{ this }}",
"ANALYZE TABLE {{ this }} COMPUTE STATISTICS FOR ALL COLUMNS;"
]
)
}}


WITH de_dup (
SELECT Id::integer,
device_id::integer,
user_id::integer,
calories_burnt::decimal,
miles_walked::decimal,
num_steps::decimal,
timestamp::timestamp,
value::string,
ingest_timestamp,
ROW_NUMBER() OVER(PARTITION BY device_id, user_id, timestamp ORDER BY ingest_timestamp DESC, timestamp DESC) AS DupRank
FROM {{target.catalog}}.{{target.schema}}.bronze_sensors
-- Add Incremental Processing Macro here
{% if is_incremental() %}

WHERE ingest_timestamp > (SELECT MAX(ingest_timestamp) FROM {{ this }})

{% endif %}
)

SELECT Id, device_id, user_id, calories_burnt, miles_walked, num_steps, timestamp, value, ingest_timestamp
-- optional
/*
sha2(CONCAT(COALESCE(Id, ''), COALESCE(device_id, ''))) AS composite_key -- use this as the key if you have composite key
*/
FROM de_dup
WHERE DupRank = 1

Now that we have an efficient Bronze and Silver layer, lets create efficient gold tables for our application / BI layer to utilize with ease.

Gold Layer

Now that we have a great incremental pipeline going, we can now build our application facing views/tables without worry. In our example we are going to define 2 aggregate tables that attempt to show a smoother view of rough IoT data. First we will create hourly smoothed aggregations by device_id to reduce the noise for BI and alerting applications when analyzing trends. Next, we will create smoothed moving average aggregates that make visualizing trends very simple for downstream applications. The resulting data would look like this in a chart:

Simple Performant Visual on the final Gold Tables

To build this these tables, we implement the following models:

{{ 
config(
materialized='table',
liquid_clustered_by='device_id, HourBucket'
)
}}

-- Get hourly aggregates for last 7 days
SELECT device_id,
date_trunc('hour', timestamp) AS HourBucket,
AVG(num_steps)::float AS AvgNumStepsAcrossDevices,
AVG(calories_burnt)::float AS AvgCaloriesBurnedAcrossDevices,
AVG(miles_walked)::float AS AvgMilesWalkedAcrossDevices
FROM {{ ref('silver_sensors_scd_1') }}
WHERE timestamp >= ((SELECT MAX(timestamp) FROM {{ ref('silver_sensors_scd_1') }}) - INTERVAL '7 DAYS')
GROUP BY device_id, date_trunc('hour', timestamp)
ORDER BY HourBucket

Notice 2 key things about this model to make this performant:

  1. Since IoT data can get VERY large, we create a fresh rolling 7 day view to ensure the BI layer only needs to refresh and calculate these complex aggregates for the last 7 days of data. This is a business choice and will depend on what the analytics needs are, but in general it will be important to CLUSTER or ZORDER your silver tables by the same event_timestamp or other columns that will serve as common filters for your gold level tables.
  2. Next, we cluster this aggregate table on device_id and HourBucket. If we have millions or billions of devices, then these 2 are key to produce efficient rolling aggregate smoothed views of data.

Lastly, we can create our final rolling smoothed aggregate view that produces our pretty visual like so:

{{ 
config(
materialized='table',
liquid_clustered_by='device_id, HourBucket'
)
}}

SELECT
device_id, HourBucket,
-- Number of Steps
(avg(`AvgNumStepsAcrossDevices`) OVER (
ORDER BY `HourBucket`
ROWS BETWEEN
4 PRECEDING AND
CURRENT ROW
)) ::float AS SmoothedNumSteps4HourMA, -- 4 hour moving average

(avg(`AvgNumStepsAcrossDevices`) OVER (
ORDER BY `HourBucket`
ROWS BETWEEN
24 PRECEDING AND
CURRENT ROW
))::float AS SmoothedNumSteps24HourMA --24 hour moving average
,
-- Calories Burned
(avg(`AvgCaloriesBurnedAcrossDevices`) OVER (
ORDER BY `HourBucket`
ROWS BETWEEN
4 PRECEDING AND
CURRENT ROW
))::float AS SmoothedCalsBurned4HourMA, -- 4 hour moving average

(avg(`AvgCaloriesBurnedAcrossDevices`) OVER (
ORDER BY `HourBucket`
ROWS BETWEEN
24 PRECEDING AND
CURRENT ROW
))::float AS SmoothedCalsBurned24HourMA --24 hour moving average,
,
-- Miles Walked
(avg(`AvgMilesWalkedAcrossDevices`) OVER (
ORDER BY `HourBucket`
ROWS BETWEEN
4 PRECEDING AND
CURRENT ROW
))::float AS SmoothedMilesWalked4HourMA, -- 4 hour moving average

(avg(`AvgMilesWalkedAcrossDevices`) OVER (
ORDER BY `HourBucket`
ROWS BETWEEN
24 PRECEDING AND
CURRENT ROW
))::float AS SmoothedMilesWalked24HourMA --24 hour moving average
FROM {{ ref('gold_hourly_summary_stats_7_day_rolling') }}
WHERE HourBucket >= ((SELECT MAX(HourBucket) FROM {{ ref('gold_hourly_summary_stats_7_day_rolling') }}) - INTERVAL '3 DAYS')

Note here, we do the same thing, always filter the views based on our clustered columns for our rolling aggregates. In this example, we pick and even smaller window as the aggregation logic is much more complex and expensive to calculate. These filters should correspond to real business use cases.

Finally, notice that in our gold table, we did not include any tblproperties in our config section of our models, but we did for our silver table. TBLPROPERTIES are crucial to understand and utilize in Delta tables, so to support these properties at scale, we can actually create a model or even project level config that will automatically propagate to all our relevant models. This allows us to manage and customize delta tblproperties all via 1 project-level config instead of needing to remember to add this for each model. This pattern can drive standardization and scale for healthy Delta tables long term. For example, if we want to implement the 3 popular table properties for data warehousing for all of our models, we can simply add a +tblproperties line to your dbt_project.yml file like so:

models:
optimized_dbt:
+materialized: table
+tblproperties: {'delta.feature.allowColumnDefaults': 'supported', 'delta.columnMapping.mode' : 'name'}

Now we have a complete end to end efficient pipeline in DBT using some of the tips and tricks that Delta and Databricks has to offer to control your models for cost and performance at scale!

Download and customize the repo to test it out!

If there is something you would like to read about from the DBSQL SME team, reach out or comment with questions or asks!

Next, we will continue this use case but instead use DBT Snapshot to process data incrementally using the SCD2 data processing style.

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL