dbt Incremental Cookbook

Aaron Bannin
12 min readJul 1, 2023

--

Incremental models are one of the most powerful features of dbt. Let’s review common patterns for implementing incrementals.

Overview

Incremental models are a dbt feature that allows us to manage large tables by adding subsets of data.

dbt Query Pattern

  • In dbt, we write templates that generate SQL.
  • We use Jinja macros to declaratively generate our SQL queries. These macros output text and are identified by {{ }} or {% %}.
  • We write queries to be idempotent and deterministic; we get the same result every time we run our SQL. We can execute dbt build at any time and our model will behave as expected. This may be as part of a scheduled run or a manually triggered ad-hoc build.

Large Tables

  • Big source tables can be expensive to query against.
  • We can achieve faster throughput in our data pipelines by decreasing the quantity of data processed on each run.

Subsets of Data

  • Isolate new or changed data in the upstream table.
  • This subset is transformed and then added to our existing table.

The following examples are based on dbt 1.4.3 using Snowflake. Query lifecycle or configurations may differ for other databases; please consult your connector’s documentation for details.

This document seeks to provide a practical guide to working with incremental models. This is not intended to be documentation for dbt or an in depth discussion of each configuration; there are plenty of great sources for this information (and some are linked below). Our focus here is to provide a conceptual frame work for working with incremental models; some technical details may be ignored to better communicate core concepts.

To identify common use cases, I will abstract source data into multiple categories; Verbs and Nouns. Most systems implicitly embrace this categorization.

  • Verbs: Representing an action performed, often implemented as web events. The action is discreet and my be performed by a user or a system. Data is immutable. Often used as facts.
  • Nouns: Things that may perform an action, or be the subject of a performed action. Is mutable. Often used as dimensions, cannot be used as a fact.
  • Snapshot and Rollup tables are common concepts within data pipelines.

Example: I added a product to my cart.

  • Verb: product_added
  • Noun: user; the person performing the action.
  • Noun: product
  • Noun: cart

Incremental Modes

Incremental models run in two different modes. The challenge is writing our model to be idempotent and deterministic for both modes.

Full Refresh (one query generated)

  • This mode is used if the destination table does not exist (first time building)
  • Behaves like a table, wrapping your SQL with create table as ({{ sql }})
  • It can also be set by adding the -full-refresh or -f flag to our dbt command.
dbt build -s target_model --full-refresh

Incremental (two queries generated)

  • The first query runs your SQL to get a new subset of data.
  • The second query adds this subset to the destination table.
  • Configuration options can be used to achieve different use cases.

Configuration

There are two configurations needed to implement incremental models:

Model Configuration

We can declare our model to be incremental within a config block in our .sql file.

{{ config(materialized = 'incremental') }}

Config blocks can also be defined in dbt_project.yml using push-down semantics. Using the project configuration is abnormal; most models begin as a view or table and are refactored to become incremental.

is_incremental() macro

is_incremental() is a flag that identifies the execution mode; false if we are in Full Refresh mode and true in Incremental Mode. This macro is used within your model’s .sql definition. The most common use case is to add a filter to the where clause to limit the quantity of data pulled from the upstream models.

{% if is_incremental() %}
and created_at > (select max(created_at) from {{ this }})
{% endif %}

Translated:

  • if is_incremental = true , then generate all lines before {% endif %} in our SQL query.
  • {{ this }} resolves to the existing table.

A model that is materialized as incremental but has no is_incremental() macro will append the results of our .sql into the destination table on every execution, breaking idempotency.

Development Tactics

Verify Generated Queries

dbt is performing multiple queries against the database; our configurations determine how these queries behave. Reviewing generated queries is a great skill to develop when working with dbt and very helpful for debugging incremental models. There are multiple ways to gain insight into dbt’s generated queries:

Query Log: Monitor generated queries using Snowflake Query History page.

Compiled Queries: Find generated queries in the target/complie and target/rundirectories.

  • target/compile holds the query generated for our upstream data. This file will have the is_incremental() clauses included in Incremental mode, but not included in Full Refresh mode.
  • target/run holds the quer(ies) used for adding our subset into the destination table. If we are in Full Refresh mode or we did not set materialized = 'incremental', our model’s SQL will be wrapped in a create table as ... statement. We may also find delete, insert, or merge queries as determined by our configuration.

dbt Logs: See all dbt actions in logs/dbt.log. Each execution adds to this file; it’s easiest to first delete the file and isolate to your current execution.

Configs MUST be spelled correctly; bad config keys will fail silently

dbt will not alert you if your configuration key is incorrect. Misspelling the key will result in the configuration not being applied.

Incremental Models MUST be Rebuilt After Adding Columns

By default, dbt WILL NOT populate new columns without a --full-refresh. The new column will be included in the first query against the source model, but not in the second query updating the destination model.

dbt provides an on_schema_change config.

  • append_new_columns: Add any new columns; populate with most recent subset.
  • sync_all_columns: Adds new columns, drops removed columns. Populate with most recent subset.
  • To populate new columns with historical data, a -full-refresh is required.

Incremental clauses MUST be applied to subqueries

/** GOOD USE OF IS_INCREMENTAL()
Both subqueries will only scan for new data
*/
with noun_snapshot as (
selectsq
date,
count(*) as total_records
from {{ ref('noun_snapshot') }}
where true
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}
group by date
),

daily_rollup as (
select
date,
count(*) as total_records
from {{ ref('daily_rollup') }}
where true
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}
group by date
)

select *
from noun_snapshot
left join daily_rollup
using (date)
/** BAD USE OF IS_INCREMENTAL()
Both subqueries will scan their whole source table
After the whole data set is joined together, we filter down to the new data
*/
with noun_snapshot as (
select
date,
count(*) as ttl_users
from {{ ref('noun_snapshot') }}
group by date
),

daily_rollup as (
select
date,
count(*) as ttl_users
from {{ ref('daily_rollup') }}
group by date
)

select *
from noun_snapshot
left join open_board
using (date)
where true
{% if is_incremental() %}
and date_partition >= (select max(date) from {{ this }})
{% endif %}

Incremental Clause MUST Match any Windowing Logic

The incremental clause limits the data retrieved from the source table. If your model looks back to previous data, then it must be included in the incremental subset.

When developing incremental models, avoid using look back windows and window functions like lead() and lag() if possible.

Here is an example calculating a 30-day rolling sum. Note the date range used in line 13 and the additional incremental clause used in line 22. This ensures we have the correct window for a given date.

with daily_rollup as (
select
date,
sum(number_of_events) as number_of_events,
sum(sum(number_of_events)) over (
order by date_partition
rows between 30 preceding and current row
) as number_of_events_last_30_days
from {{ ref('daily_rollup') }}
where true
-- Our window function needs 30 records
{% if is_incremental() %}
and date >= (select max(date) - 31 from {{ this }})
{% endif %}
group by date
)

select * from daily_rollup
where true
-- Only insert the most recent dates,
-- they are the only records with full windows
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}

Common Use Cases

Verbs Append Only

Use when: Source data is append-only with a reliable created_at (or equivalent) timestamp. Most effective with Verbs.

Downside: Not usable with Most effective for Snapshot or Rollup tables with discrete date spines. Inherits uniqueness from source data.

  • Missing Data: If your ETL runs on a daily schedule, your model will be built mid-day (after midnight). Configured as append-only, the daily run will only have the data present when your query was executed.
    Scenario:
    We set our is_incremtal() clause to use >. Model is built at Jan 1 0600. It currently has all date up to the run time. On Jan 2 0600, we have an incremental run. Since max(date) = Jan 1, we only append data for Jan 2 0000 -- Jan 2 0600. We are missing the data for Jan 1 0601 -- Jan 1 2359.
  • Duplicate Data: To prevent against missing windows of data, we might use >= instead of >. This creates duplicate data.
    Scenario:

    We set our is_incremtal() clause to use >=. Model is built at Jan 1 0600. It currently has all date up to the run time. On Jan 2 0600, we have an incremental run. Since max(date) = Jan 1, we pull data for Jan 1 0000 -- Jan 1 2359 and Jan 2 0000 -- Jan 2 0600. We now have duplicated the data for Jan 1 0000 -- Jan 1 0600.

Critical Configs: Using materialized = 'incremental' and no other configurations.

-- append_only.sql

{{ config(materialized = 'incremental') }}

select
created_at,
sq payload
from {{ ref('verb') }}
where true
{% if is_incremental() %}
and received_at > (select max(received_at) from {{ this }})
{% endif %}

Verbs Append Overlapping Windows

Use when: Add new rows every run; table has a date or timestamp column. Great for events or daily tables. Runs fast for large tables. Most effective when source is Snapshot or Rollup tables with discrete (not continuous; e.g. timestamp) date spines.

Downside: Does not enforce uniqueness.

Critical Configs:

  • incremental_strategy='delete+insert':
    First, create temporary table to stage data. Next, delete overlapping unique_key values in destination table. Last, insert staged data into destination table.
  • unique_key = 'date':
    The column used to identify what is being added to the destination table. This is a poorly named variable; the columns values donot need to be unique. If the column used IS THE UNIQUE KEY, then delete query must perform a full table scan of the destination table. This can be VERY COSTLY for large tables. Using date means we act against partitions and can be VERY FAST for large tables. It’s much easier to drop and add partitions.
  • >= in the incremental clause: This ensures that we do not have missing data. Because the table is built mid-morning, it does not have all data for the current day. At Jan 1 0900, we only have data for the hours between 0000-0900. Our table has a max(date) = Jan 1. We run the query on Jan 2 0900; using > means we only add data for Jan 2 0000-0900. By using >=, we add data for Jan 1 0000-Jan 2 0900. We are duplicating the work for the previous day, but we are not duplicating the data.
  • Optional: cluster_by: This is a configuration provided by (and unique to) the Snowflake database connector. By default, Snowflake partitions your table automatically. Sometimes, it does a poor job at defining the clustering key; this configuration forces Snowflake to cluster on the provided columns. Use with caution, as this can decrease performance for other access patterns.
-- append_with_overlapping.sql

{{
config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'date',
cluster_by = ['date']
)
}}

select
date_trunc(date, created_at) as date,
payload
from {{ ref('verb') }}
where true
-- Using >= ensures we get the same result on every run
-- Note that the column name is date in the destination table
{% if is_incremental() %}
and created_at >= (select max(date) from {{ this }})
{% endif %}

Stateful Nouns

Use When: An object has attributes that change over time; table should show the most recent values for the attributes based on a reliable updated_at or modified_at timestamp. Uses a strictly enforced unique key. In it’s simplest form, this is a type 1 SCD.

Downside: Very slow for large tables. Incoming data must have a unique key. Modified date must be reliable.

Critical Configs:

  • Optional: incremental_strategy = 'merge': Use a MERGE query to add new data to the destination table. If you configure a unique_key, then merge will be used by default. Merge provides “upsert” (update + insert) functionality in Snowflake; if the object_id does not exist, create a new row. If it does exist, update the row.
  • unique_key = 'object_id': The primary key for the table.
  • Incremental clause must refer to updated_at or an equivalent value.
-- stateful_noun.sql

{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'object_id'
)
}}

select
object_id,
modified_at,
attribute
from {{ ref('noun') }}
where true
{% if is_incremental() %}
and modified_at >= (select max(modified_at) from {{ this }})
{% endif %}

Semi-Unique Verbs

Similar to Stateful Noun, but we only update a subset of the destination table

Use When: Verb where upstream uniqueness is not guaranteed.

Downside: Incoming subset must be de-duplicated. Resulting data set not guaranteed to be unique (see below).

Critical Configs:

  • Optional: incremental_strategy = 'merge': See above
  • unique_key = 'message_id': Used in the merge query
  • incremental_predicates = [ "DBT_INTERNAL_DEST.created_at >= current_date - 3"]: The subset of data that we MERGE into. Without this configuration, we compare the message_ids in the staged data against the entire destination table. This keeps the query performant, but also means that we can duplicate message_ids.
    - message_id=1 is received on Jan 1 and added to destination table.
    - message_id=1 is received again on Jan 2, the existing record in the destination table is updated.
    - message_id=1 is received again on Jan 5, the existing record is not included in our incremental_predicate subset and we create a new record in the destination table. We now have 2 records for our message; Jan 2 and Jan 5.
-- events_with_semi_unique_id.sql

{{
config(
materialized = 'incremental',
unique_key = 'message_id',
incremental_predicates = [ "DBT_INTERNAL_DEST.created_at >= current_date - 3"]
)
}}

select
message_id,
created_at,
payload
from {{ ref('verb') }}
where true
{% if is_incremental() %}
and created_at >= (select max(created_at) from {{ this }})
{% endif %}

-- Explicitly dedup message_id to avoid conflicts during MERGE
qualify row_number() over(
partition by message_id order by created_at asc
) = 1

First Value

Use When: For a given noun, we want to persist an attribute’s first value to prevent full table scans for downstream queries. Source data is a verb or snapshot with a date spine. Note: If the underlying data is a mutable noun, it is better to first create a snapshot table with a first value table downstream. This ensures the model is idempotent.

Downside: Narrow use case. Subset data must be unique. Source data with a continuous timestamp (created_at) or date spine preferred.

Critical Configs:

  • Everything from Stateful Noun (see above)
  • merge_update_columns=['unique_id']: If the record is in the destination table, only update the configured columns. All other columns are set when the record is created.
-- first_value.sql

{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'object_id',
merge_update_columns = ['object_id']
)
}}

select
object_id,
created_at,
attribute
from {{ ref('noun') }}
where true
{% if is_incremental() %}
and date >= (select max(date) from {{ this }})
{% endif %}

Incremental Rollups

Use When: Rollup tables with accumulating measures where source data is too big to output destination grain.

Downside: Narrow use case. Hard to debug and implement.

Critical Configs:

  • Everything from Stateful Noun (see above)
  • Conditional self-join for incremental mode
-- incremental_rollup.sql

{{
config(
materialized='incremental',
unique_key='unique_key'
)
}}

with
verb as (select * from {{ ref('verb') }}),
new as (
select
date_trunc(date, created_at) as date,
count(distinct message_id) as event_count,
max(created_at) as last_created_at_time
from verb
where
{% if is_incremental() %
and date(created_at) > (select max(last_created_at_time) from {{ this }})
{% endif %}
group by date
)

-- If we are in incremental mode, join into existing table to increment totals
{% if is_incremental() %}
, reaggregate as (
select
new.date,
sum(coalesce(existing.event_count, 0) + new.event_count) as event_count,
max(new.last_created_at_time) as last_created_at_time
from new
left join {{ this }} as existing
on new.date = existing.date
group by date
)

select * from reaggregate
{% else %}
-- Just use the results from new in full refresh mode
select * from new
{% endif %}

Additional Links

dbt Documentation

dbt Incremental Models In-Depth

Bruno Souza de Lima series on Medium

--

--