dbt Incremental Cookbook
Incremental models are one of the most powerful features of dbt. Let’s review common patterns for implementing incrementals.
Overview
Incremental models are a dbt feature that allows us to manage large tables by adding subsets of data.
dbt Query Pattern
- In dbt, we write templates that generate SQL.
- We use Jinja macros to declaratively generate our SQL queries. These macros output text and are identified by
{{ }}
or{% %}
. - We write queries to be idempotent and deterministic; we get the same result every time we run our SQL. We can execute
dbt build
at any time and our model will behave as expected. This may be as part of a scheduled run or a manually triggered ad-hocbuild
.
Large Tables
- Big source tables can be expensive to query against.
- We can achieve faster throughput in our data pipelines by decreasing the quantity of data processed on each run.
Subsets of Data
- Isolate new or changed data in the upstream table.
- This subset is transformed and then added to our existing table.
The following examples are based on dbt 1.4.3 using Snowflake. Query lifecycle or configurations may differ for other databases; please consult your connector’s documentation for details.
This document seeks to provide a practical guide to working with incremental models. This is not intended to be documentation for dbt or an in depth discussion of each configuration; there are plenty of great sources for this information (and some are linked below). Our focus here is to provide a conceptual frame work for working with incremental models; some technical details may be ignored to better communicate core concepts.
To identify common use cases, I will abstract source data into multiple categories; Verbs and Nouns. Most systems implicitly embrace this categorization.
- Verbs: Representing an action performed, often implemented as web events. The action is discreet and my be performed by a user or a system. Data is immutable. Often used as facts.
- Nouns: Things that may perform an action, or be the subject of a performed action. Is mutable. Often used as dimensions, cannot be used as a fact.
- Snapshot and Rollup tables are common concepts within data pipelines.
Example: I added a product to my cart.
- Verb:
product_added
- Noun:
user
; the person performing the action. - Noun:
product
- Noun:
cart
Incremental Modes
Incremental models run in two different modes. The challenge is writing our model to be idempotent and deterministic for both modes.
Full Refresh (one query generated)
- This mode is used if the destination table does not exist (first time building)
- Behaves like a table, wrapping your SQL with
create table as ({{ sql }})
- It can also be set by adding the
-full-refresh
or-f
flag to our dbt command.
dbt build -s target_model --full-refresh
Incremental (two queries generated)
- The first query runs your SQL to get a new subset of data.
- The second query adds this subset to the destination table.
- Configuration options can be used to achieve different use cases.
Configuration
There are two configurations needed to implement incremental models:
Model Configuration
We can declare our model to be incremental within a config block in our .sql
file.
{{ config(materialized = 'incremental') }}
Config blocks can also be defined in dbt_project.yml
using push-down semantics. Using the project configuration is abnormal; most models begin as a view
or table
and are refactored to become incremental
.
is_incremental() macro
is_incremental()
is a flag that identifies the execution mode; false
if we are in Full Refresh mode and true
in Incremental Mode. This macro is used within your model’s .sql
definition. The most common use case is to add a filter to the where
clause to limit the quantity of data pulled from the upstream models.
{% if is_incremental() %}
and created_at > (select max(created_at) from {{ this }})
{% endif %}
Translated:
- if
is_incremental = true
, then generate all lines before{% endif %}
in our SQL query. {{ this }}
resolves to the existing table.
A model that is materialized as incremental but has no is_incremental()
macro will append the results of our .sql
into the destination table on every execution, breaking idempotency.
Development Tactics
Verify Generated Queries
dbt is performing multiple queries against the database; our configurations determine how these queries behave. Reviewing generated queries is a great skill to develop when working with dbt and very helpful for debugging incremental models. There are multiple ways to gain insight into dbt’s generated queries:
Query Log: Monitor generated queries using Snowflake Query History page.
Compiled Queries: Find generated queries in the target/complie
and target/run
directories.
target/compile
holds the query generated for our upstream data. This file will have theis_incremental()
clauses included in Incremental mode, but not included in Full Refresh mode.target/run
holds the quer(ies) used for adding our subset into the destination table. If we are in Full Refresh mode or we did not setmaterialized = 'incremental'
, our model’s SQL will be wrapped in acreate table as ...
statement. We may also finddelete
,insert
, ormerge
queries as determined by our configuration.
dbt Logs: See all dbt actions in logs/dbt.log
. Each execution adds to this file; it’s easiest to first delete the file and isolate to your current execution.
Configs MUST be spelled correctly; bad config keys will fail silently
dbt will not alert you if your configuration key is incorrect. Misspelling the key will result in the configuration not being applied.
Incremental Models MUST be Rebuilt After Adding Columns
By default, dbt WILL NOT populate new columns without a --full-refresh
. The new column will be included in the first query against the source model, but not in the second query updating the destination model.
dbt provides an on_schema_change
config.
append_new_columns
: Add any new columns; populate with most recent subset.sync_all_columns
: Adds new columns, drops removed columns. Populate with most recent subset.- To populate new columns with historical data, a
-full-refresh
is required.
Incremental clauses MUST be applied to subqueries
/** GOOD USE OF IS_INCREMENTAL()
Both subqueries will only scan for new data
*/
with noun_snapshot as (
selectsq
date,
count(*) as total_records
from {{ ref('noun_snapshot') }}
where true
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}
group by date
),
daily_rollup as (
select
date,
count(*) as total_records
from {{ ref('daily_rollup') }}
where true
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}
group by date
)
select *
from noun_snapshot
left join daily_rollup
using (date)
/** BAD USE OF IS_INCREMENTAL()
Both subqueries will scan their whole source table
After the whole data set is joined together, we filter down to the new data
*/
with noun_snapshot as (
select
date,
count(*) as ttl_users
from {{ ref('noun_snapshot') }}
group by date
),
daily_rollup as (
select
date,
count(*) as ttl_users
from {{ ref('daily_rollup') }}
group by date
)
select *
from noun_snapshot
left join open_board
using (date)
where true
{% if is_incremental() %}
and date_partition >= (select max(date) from {{ this }})
{% endif %}
Incremental Clause MUST Match any Windowing Logic
The incremental clause limits the data retrieved from the source table. If your model looks back to previous data, then it must be included in the incremental subset.
When developing incremental models, avoid using look back windows and window functions like lead()
and lag()
if possible.
Here is an example calculating a 30-day rolling sum. Note the date range used in line 13 and the additional incremental clause used in line 22. This ensures we have the correct window for a given date.
with daily_rollup as (
select
date,
sum(number_of_events) as number_of_events,
sum(sum(number_of_events)) over (
order by date_partition
rows between 30 preceding and current row
) as number_of_events_last_30_days
from {{ ref('daily_rollup') }}
where true
-- Our window function needs 30 records
{% if is_incremental() %}
and date >= (select max(date) - 31 from {{ this }})
{% endif %}
group by date
)
select * from daily_rollup
where true
-- Only insert the most recent dates,
-- they are the only records with full windows
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}
Common Use Cases
Verbs Append Only
Use when: Source data is append-only with a reliable created_at
(or equivalent) timestamp. Most effective with Verbs.
Downside: Not usable with Most effective for Snapshot or Rollup tables with discrete date spines. Inherits uniqueness from source data.
- Missing Data: If your ETL runs on a daily schedule, your model will be built mid-day (after midnight). Configured as append-only, the daily run will only have the data present when your query was executed.
Scenario:
We set ouris_incremtal()
clause to use>
. Model is built atJan 1 0600
. It currently has all date up to the run time. OnJan 2 0600
, we have an incremental run. Sincemax(date) = Jan 1
, we only append data forJan 2 0000 -- Jan 2 0600
. We are missing the data forJan 1 0601 -- Jan 1 2359
. - Duplicate Data: To prevent against missing windows of data, we might use
>=
instead of>
. This creates duplicate data.
Scenario:
We set ouris_incremtal()
clause to use>=
. Model is built atJan 1 0600
. It currently has all date up to the run time. OnJan 2 0600
, we have an incremental run. Sincemax(date) = Jan 1
, we pull data forJan 1 0000 -- Jan 1 2359
andJan 2 0000 -- Jan 2 0600
. We now have duplicated the data forJan 1 0000 -- Jan 1 0600
.
Critical Configs: Using materialized = 'incremental'
and no other configurations.
-- append_only.sql
{{ config(materialized = 'incremental') }}
select
created_at,
sq payload
from {{ ref('verb') }}
where true
{% if is_incremental() %}
and received_at > (select max(received_at) from {{ this }})
{% endif %}
Verbs Append Overlapping Windows
Use when: Add new rows every run; table has a date or timestamp column. Great for events or daily tables. Runs fast for large tables. Most effective when source is Snapshot or Rollup tables with discrete (not continuous; e.g. timestamp) date spines.
Downside: Does not enforce uniqueness.
Critical Configs:
incremental_strategy='delete+insert'
:
First,create
temporary table to stage data. Next,delete
overlappingunique_key
values in destination table. Last,insert
staged data into destination table.unique_key = 'date'
:
The column used to identify what is being added to the destination table. This is a poorly named variable; the columns values donot need to be unique. If the column used IS THE UNIQUE KEY, thendelete
query must perform a full table scan of the destination table. This can be VERY COSTLY for large tables. Usingdate
means we act against partitions and can be VERY FAST for large tables. It’s much easier to drop and add partitions.>=
in the incremental clause: This ensures that we do not have missing data. Because the table is built mid-morning, it does not have all data for the current day. AtJan 1 0900
, we only have data for the hours between0000-0900
. Our table has amax(date) = Jan 1
. We run the query onJan 2 0900
; using>
means we only add data forJan 2 0000-0900
. By using>=
, we add data forJan 1 0000-Jan 2 0900
. We are duplicating the work for the previous day, but we are not duplicating the data.- Optional:
cluster_by
: This is a configuration provided by (and unique to) the Snowflake database connector. By default, Snowflake partitions your table automatically. Sometimes, it does a poor job at defining the clustering key; this configuration forces Snowflake to cluster on the provided columns. Use with caution, as this can decrease performance for other access patterns.
-- append_with_overlapping.sql
{{
config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'date',
cluster_by = ['date']
)
}}
select
date_trunc(date, created_at) as date,
payload
from {{ ref('verb') }}
where true
-- Using >= ensures we get the same result on every run
-- Note that the column name is date in the destination table
{% if is_incremental() %}
and created_at >= (select max(date) from {{ this }})
{% endif %}
Stateful Nouns
Use When: An object has attributes that change over time; table should show the most recent values for the attributes based on a reliable updated_at
or modified_at
timestamp. Uses a strictly enforced unique key. In it’s simplest form, this is a type 1 SCD.
Downside: Very slow for large tables. Incoming data must have a unique key. Modified date must be reliable.
Critical Configs:
- Optional:
incremental_strategy = 'merge'
: Use a MERGE query to add new data to the destination table. If you configure aunique_key
, thenmerge
will be used by default. Merge provides “upsert” (update + insert) functionality in Snowflake; if theobject_id
does not exist, create a new row. If it does exist, update the row. unique_key = 'object_id'
: The primary key for the table.- Incremental clause must refer to
updated_at
or an equivalent value.
-- stateful_noun.sql
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'object_id'
)
}}
select
object_id,
modified_at,
attribute
from {{ ref('noun') }}
where true
{% if is_incremental() %}
and modified_at >= (select max(modified_at) from {{ this }})
{% endif %}
Semi-Unique Verbs
Similar to Stateful Noun, but we only update a subset of the destination table
Use When: Verb where upstream uniqueness is not guaranteed.
Downside: Incoming subset must be de-duplicated. Resulting data set not guaranteed to be unique (see below).
Critical Configs:
- Optional:
incremental_strategy = 'merge'
: See above unique_key = 'message_id'
: Used in the merge queryincremental_predicates = [ "DBT_INTERNAL_DEST.created_at >= current_date - 3"]
: The subset of data that weMERGE
into. Without this configuration, we compare themessage_id
s in the staged data against the entire destination table. This keeps the query performant, but also means that we can duplicatemessage_id
s.
-message_id=1
is received on Jan 1 and added to destination table.
-message_id=1
is received again on Jan 2, the existing record in the destination table is updated.
-message_id=1
is received again on Jan 5, the existing record is not included in ourincremental_predicate
subset and we create a new record in the destination table. We now have 2 records for our message; Jan 2 and Jan 5.
-- events_with_semi_unique_id.sql
{{
config(
materialized = 'incremental',
unique_key = 'message_id',
incremental_predicates = [ "DBT_INTERNAL_DEST.created_at >= current_date - 3"]
)
}}
select
message_id,
created_at,
payload
from {{ ref('verb') }}
where true
{% if is_incremental() %}
and created_at >= (select max(created_at) from {{ this }})
{% endif %}
-- Explicitly dedup message_id to avoid conflicts during MERGE
qualify row_number() over(
partition by message_id order by created_at asc
) = 1
First Value
Use When: For a given noun, we want to persist an attribute’s first value to prevent full table scans for downstream queries. Source data is a verb or snapshot with a date spine. Note: If the underlying data is a mutable noun, it is better to first create a snapshot table with a first value table downstream. This ensures the model is idempotent.
Downside: Narrow use case. Subset data must be unique. Source data with a continuous timestamp (created_at
) or date spine preferred.
Critical Configs:
- Everything from Stateful Noun (see above)
merge_update_columns=['unique_id']
: If the record is in the destination table, only update the configured columns. All other columns are set when the record is created.
-- first_value.sql
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'object_id',
merge_update_columns = ['object_id']
)
}}
select
object_id,
created_at,
attribute
from {{ ref('noun') }}
where true
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}
Incremental Rollups
Use When: Rollup tables with accumulating measures where source data is too big to output destination grain.
Downside: Narrow use case. Hard to debug and implement.
Critical Configs:
- Everything from Stateful Noun (see above)
- Conditional self-join for incremental mode
-- incremental_rollup.sql
{{
config(
materialized='incremental',
unique_key='unique_key'
)
}}
with
verb as (select * from {{ ref('verb') }}),
new as (
select
date_trunc(date, created_at) as date,
count(distinct message_id) as event_count,
max(created_at) as last_created_at_time
from verb
where
{% if is_incremental() %
and date(created_at) > (select max(last_created_at_time) from {{ this }})
{% endif %}
group by date
)
-- If we are in incremental mode, join into existing table to increment totals
{% if is_incremental() %}
, reaggregate as (
select
new.date,
sum(coalesce(existing.event_count, 0) + new.event_count) as event_count,
max(new.last_created_at_time) as last_created_at_time
from new
left join {{ this }} as existing
on new.date = existing.date
group by date
)
select * from reaggregate
{% else %}
-- Just use the results from new in full refresh mode
select * from new
{% endif %}
Additional Links
dbt Incremental Models In-Depth
Bruno Souza de Lima series on Medium