dbt Incremental: Implementing & Testing — P2

Alice Bui
Joon Solutions Global
5 min readSep 13, 2023

In the first part dbt Incremental: Choosing the Right Strategy — P1, I covered the topic of choosing the appropriate incremental strategy, which significantly impacts the cost and time of each table.

In this blog, I’ll discuss how I implement and test incremental models.

2. Implementing

I will take the following model as an example.

DAG of fct__jira__issue

2.1. Config Incremental Strategy

The _fivetran_synced timestamp was skewed because, at that time, I had just switched from Stitch to Fivetran to ingest Jira data. So, in this case, I chose the merge with clustered strategy.

{{
config(
materialized = 'incremental',
unique_key = 'issue_id',
-- This will affect how the data is stored on disk and indexed to limit scans
cluster_by = 'project_key',
incremental_strategy = 'merge'
)
}}

But if the insert_overwrite strategy was possible, the config block could be as below:

-- Let's define the partitions and decide the replace the last 2 
-- days, just in case some clicks did not arrive yet.
{% set partitions_to_replace = [
'current_date,''
'date_sub(current_date, interval 1 day)'
] %}

-- Here, we define the incremental model; the data will be
-- partitioned by the date, and I am also clustering by project_key
-- to improve performance. I am choosing the insert_overwrite
-- strategy explicitly
{{
config(
materialized='incremental',
partition_by = { 'field': '_fivetran_synced', 'data_type': 'date' },
cluster_by = "project_key",
incremental_strategy = 'insert_overwrite'
partitions = partitions_to_replace
)
}}

2.2. Choose Timestamp Column and Creat Time Range Filter

  • It is recommended to choose the synced timestamp of the ingestion tool (e.g., _fivetran_synced) for all incremental models.
  • Find out the last modified timestamp using GREATEST() function: The final fact model is built from several staging models. If we simply choose the timestamp from one staging model, the fields from other staging models might not be updated. Therefore, we must find the most updated timestamp for each row. In this case, I used GREATEST() to pick the last modified _fivetran_synced timestamp.
       greatest(
issue._fivetran_synced,
coalesce(parent_issue._fivetran_synced, issue._fivetran_synced),
coalesce(project._fivetran_synced, issue._fivetran_synced)
) as _fivetran_synced

Read more about GREATEST() function HERE.

  • Set up a time range filter to sort out data that need to update:
select
*
from final

{% if is_incremental() %}
-- This filter will only be applied on an incremental run
where
_fivetran_synced >= (
select date_sub(max(_fivetran_synced), interval 3 day) from {{ this }}
)
{% endif %}

2.3. (Optional) Add build_source Column

Later on, the build_source field enables us to assess whether the incremental models have been configured properly. However, it is not a compulsory step.

select
*,
{% if is_incremental() %}
'incremental' as build_source
{% else %}
'full' as build_source
{% endif %}
from final

2.4. Putting It All Together!

{{
config(
materialized = 'incremental',
unique_key = 'issue_id',
-- This will affect how the data is stored on disk and indexed to limit scans
cluster_by = 'project_key',
incremental_strategy = 'merge'
)
}}

with

...

transform as (

select
...
greatest(
issue._fivetran_synced,
coalesce(parent_issue._fivetran_synced, issue._fivetran_synced),
coalesce(project._fivetran_synced, issue._fivetran_synced)
) as _fivetran_synced

from issue
left join issue as parent_issue on issue.parent_id = parent_issue.issue_id
left join project on issue.project_id = project.project_id
left join
project_mapping
on
project.project_key = project_mapping.project_key
and issue.issue_summary like concat(
'%', project_mapping.issue_summary, '%'
)
left join
project_mapping as project_mapping_v2
on
project.project_key = project_mapping_v2.project_key_clean -- to not miss out projects that are not ('PP', 'POD', 'THP') but have null categories
---one-to-one
left join custom_field_pivot on issue.issue_id = custom_field_pivot.issue_id

),

final as (

...

)

select
*,
{% if is_incremental() %}
'incremental' as build_source
{% else %}
'full' as build_source
{% endif %}
from final

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where
_fivetran_synced >= (
select date_sub(max(_fivetran_synced), interval 3 day) from {{ this }}
)
{% endif %}

3. Testing

In order to debug faster, it is recommended to audit tables from downstream to upstream models (base -> stg -> int -> fct)

To check whether there is a discrepancy between the full-refresh model and incremental model, we should take the following steps:

3.1. Create Full-refresh and Incremental Model

Since the incremental model only cares about recent data, so in J4J, we only compare data in the last 3 days. Therefore, we need to 2 types of model:

(1) completely full-refresh model

(2) model that is built full refresh before N-3 (N = current date) and incremental in the last 3 days and store in custom schema, e.g. dbt_na_incremental_test_mart

To create the latter type:

  • Create a custom filter date range macro:
macros/filter_by_day_range.sql

{% macro filter_by_day_range(
ts_column='_fivetran_synced',
reference_ts='current_timestamp',
days_to_keep= var("filter_base_models_days")
)%}
---- Only run on recent data in local dev, not on CI & CD run
{%- if target.name == 'dev' -%}
----uncomment the next line when you want to disable the record filter
--1=1

----only for incremental test: uncomment the next line when you want to full refresh records til N-3 (N = current date)
date_add( cast({{ reference_ts }} as timestamp), interval - cast({{ days_to_keep }} as int) day ) > cast({{ ts_column }} as timestamp)

--uncomment the next line when you want to limit processed records from N-3 til now
--date_add( cast({{ reference_ts }} as timestamp), interval - cast({{ days_to_keep }} as int) day ) < cast({{ ts_column }} as timestamp)
{% else %}
true
{%- endif -%}
{% endmacro %}
  • Add custom filter in base models, e.g. base__jira__worklog model
models/staging/jira/base/base__jira__issue.sql

with source as (

select * from {{ source('jira', 'issue') }}
where {{ filter_by_day_range() }}

),

...

3.2. Create Audit Tests

  • Using audit_helper package to create tests that compare between the full-refresh model and incremental model
  • Start with base model, then continue with upstream models
dbt_audit/assert__compare__relations__base__jira__issue.sql

{% set old_etl_relation=adapter.get_relation(
database="joon-bi-data-mart-dev",
schema="dbt_na_mart",
identifier="base__jira__issue"
) -%}
{% set dbt_relation=adapter.get_relation(
database="joon-bi-data-mart-dev",
schema="dbt_na_incremental_test_mart",
identifier="base__jira__issue"
) -%}

{{ audit_helper.compare_relations(
a_relation=old_etl_relation,
b_relation=dbt_relation,
exclude_columns=["_fivetran_synced", "build_source"],
primary_key="issue_id"
) }}

Reference

Audit_helper in dbt: Bringing data auditing to a higher level | dbt Developer Blog

Understanding dbt Incremental Strategies part 1/2

Understanding dbt Incremental Strategies part 2/2

Two completely different types of dbt incremental models in Bigquery

--

--

Alice Bui
Joon Solutions Global

Analytics Engineer @ Joon Solutions | GDE, dbt, Looker, Airflow Certified