Slim CI with dbt Core and Snowpark

Thierno Diallo
7 min readMar 27, 2024

--

I have recently enhanced my project with a valuable addition: a Slim CI pipeline. This job automates testing and validation of all Pull Requests (PRs) before merging them into production.

While implement a CI might seem straightforward, a Slim CI using dbt Core requires an extra effort compared to dbt Cloud, which has built-in Slim CI functionality.

Slim CI

Before talking about Slim CI, what is CI ?

CI stands for Continuous Integration. It’s a software development practice where developers frequently merge their code changes into a central repository. This is often paired with Continuous Delivery (CD) to automate the deployment process (CI/CD).

On the other hand, Slim CI is a feature specific to dbt that optimizes the CI process for data models. Indeed it allows to only run, tests and build models that are affected by recent changes, making the CI more efficient and cost-effective. While Slim CI is readily available in dbt Cloud, dbt Core users need to implement it themselves.

The core principales remain the same:

  1. Tracking dbt executions artifacts (manifest.json)
  2. Identifying modified nodes between two dbt executions
  3. Building only modified nodes and their downstream dependencies

This ensures only impacted parts of the project are rebuilt on each pull request, allowing efficient testing without breaking existing functionality.

Slim CI with dbt Core

As described above the Slim CI process can be divided into three steps.

Step 1: Tracking dbt executions artifacts (manifest.json)

This is the most challenging part with dbt core. In fact, this step involves capturing and saving the manifest.json file.

Manifest or more precisely manifest.json is a dbt artifact generated at each compilation or execution of your project. The file is by default saved inside dbt /target directory and is not versioned.

This single file (manifest.json) contains a full representation of your dbt project’s resources (models, tests, macros, etc), including all node configurations and resource properties. It is used to populate the docs site, and to perform state comparison.

While manifest.json has various uses, its significance in Slim CI lies in enabling state comparison, which we’ll cover in later step. To leverage this effectively, I needed to capture and access manifest.json. Since, my technical stack consists of dbt Core with Snowflake, a potential solution involves using Snowflake internal stages.

Manage artifacts with Snowpark

Snowpark is Snowflake library for querying and processing data in Snowflake using common development languages: Java, Python and Scala. For my project I’m using Snowpark Python API.

Project structure

my_project
|___ snowflake/
|___|___ src/
|___|___|___ manage_dbt_artifacts.py
|___| transformation/ (dbt project)
|___|___ target/
|___|___|___ manifest.json

To manage manifest.json within my Snowflake environment, I’ve implemented a Python script manage_dbt_artifacts.py that utilizes Snowpark API. This script performs the following tasks:

  • Connects to Snowflake: Establishes a secure connection to the appropriate Snowflake environment.
  • Manages Internal Stage: Creates and interacts with an internal Snowflake stage named "dbt_artifacts"for storing the manifest.json file
  • Handles File Transfer: Accepts command-line arguments specifying the operation type (load or download).
  • Load: Retrieves the manifest.json file from the designated location (transformation/target/) and uploads it to the internal stage.
  • Download: Retrieves the manifest.json file from the internal stage and saves it to the specified location (transformation/).

Check the code below for more details:

# manage_dbt_artifacts.py

import sys
import os
import snowflake.snowpark as snowpark
from helpers import connection_utils
import logging

def main():
# Set up logging configuration
logging.basicConfig(level=logging.INFO)

# Check command-line arguments
if len(sys.argv) != 2:
logging.info("Usage: dbt_artifacts.py type")
sys.exit(1)

# Establish Snowflake session
session = connection_utils.get_session()
type = sys.argv[1]
artifacts_stage = "@tech.dbt_artifacts"

# Determine file path
script_dir = os.path.dirname(os.path.abspath(__file__))
dbt_project_dir = os.path.normpath(os.path.join(script_dir, '../../../transformation'))
file_path = os.path.join(dbt_project_dir, 'target/manifest.json')

# Perform action based on command-line argument
if type == 'load':
load_artifacts(session, artifacts_stage, file_path)
elif type == 'download':
unload_artifacts(session, artifacts_stage, dbt_project_dir)
else:
logging.error("Usage: dbt_artifacts.py type")
raise ValueError("Incorrect value for type")

session.close()

return 'SUCCESS'


def load_artifacts(session: snowpark.Session, stage: str, file_path: str, ) -> None:
try:
_ = session.sql(f"create stage if not exists {stage[1:]} file_format = (type='JSON')").collect()
logging.info(f"{stage} created successfully")

_ = session.file.put(file_path, stage, auto_compress=False, overwrite=True)
logging.info(f"{file_path} loaded successfully into {stage}")

except snowpark.exceptions.SnowparkSQLException as err:
logging.error(f"Unexpected {err=}, {type(err)=}")
raise

else:
res = session.sql(f"ls {stage}").collect()
logging.info(res)


def unload_artifacts(session: snowpark.Session, stage: str, target_directory: str) -> None:
try:
_ = session.file.get(stage, target_directory)
logging.info(f"Files unloaded successfully into {target_directory}")

except Exception as err:
logging.error(f"Unexpected {err=}, {type(err)=}")
raise

if __name__ == "__main__":
main()

Step 2: Identifying modified nodes between two dbt executions

The second step involves determining which parts of dbt project have been modified since the last execution. This is achieved by leveraging dbt’s built-in state comparison functionality.

In fact, dbt maintains a snapshot of your project’s state at each execution, captured in the manifest.json file. By utilizing the state method, we can compare this current state with a previous one and identify the specific nodes (models, tests, etc.) that have been modified.

dbt build --models state:modified+ --state=.

Above command look for modified nodes between manifest.json at current folder --state=. and the one that will be computed into/target directory. Including the selection operator (+) allows to execute downstream nodes.

Step 3: Building only modified nodes and their downstream dependencies

Now that we can load and unload dbt manifest.json (Step 1), compare and retrieve modified nodes between different manifest.json (Step 2), the final step involves efficiently building only the recent changes.

Here’s the challenge: building a node in dbt requires its upstream dependencies to be build first. This is where we will leveraging a feature of dbt called “Defer”.

Defer is a powerful feature that makes it possible to run a subset of models or tests in a sandbox environment without having to first build their upstream parents. Defer requires that a manifest from a previous dbt invocation be passed to the --state flag or env var.

By combining --defer with state comparison (--state), we can leverage existing upstream nodes from a separate environment (like QA or production) to build only the recently modified nodes and their downstream dependencies within the CI pipeline.

dbt build --select state:modified+ --defer --favor-state --state=.

The --favor-state flag ensures that we can build all objects in a separate environment (e.g., _PR_XX_), even if their parent nodes are not present there

Create a dedicated testing environment

To truly mimic dbt Cloud’s Slim CI behavior, we need to build the modified nodes and their dependencies in a separate environment within our Snowflake database. This ensures isolation and prevents conflicts with existing objects (cf. Achieve Conflicts-Free PRs using dbt Core Custom Schemas ).

One approach involves updating dbt’s custom schema generation macro generate_schema_name_for_env to use an environment variable (DBT_CUSTOM_SCHEMA) that gets populated during every CI pipeline execution. The variable might hold a value like PR_[PR_NUMBER] to create a dedicated schema for each PR.

-- generate_schema_name_for_env

{% macro generate_schema_name_for_env(custom_schema_name, node) -%}

{%- set default_schema = target.schema -%}
{%- set env_custom_schema_name = env_var("DBT_CUSTOM_SCHEMA", "") | trim -%}

{%- if env_custom_schema_name != "" -%}
{% if custom_schema_name is none -%}
_{{ env_custom_schema_name }}_{{ default_schema }}
{% else %}
_{{ env_custom_schema_name }}_{{ custom_schema_name | trim }}
{% endif %}

{%- elif custom_schema_name is none -%}
{{ default_schema }}

{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}

{%- endmacro %}

Implementing the Slim CI workflow

The final part consists in integrating the previously described actions into a CI tool (like Github Actions, Jenkins, Gitlab CI/CD etc.). This CI pipeline will orchestrate the entire Slim CI process.

Let’s assume you use an indirect promotion model with a dedicated QA branch before merging to production. Here’s a possible execution plan:

source

Prerequisites:

The workflow suppose an initialmanifest.json file representing the current state of your QA environment within your Snowflake internal stage.

Job 1: Testing on PR Merge to QA Branch

  • Download QA dbt artifacts from @internal_stage to /transformation:
python manage_dbt_artifacts.py download
  • Identify modified nodes and build them along with their dependencies within a separate PR_[PR_NUMBER] environment
dbt build --select state:modified+ --defer --favor-state --state=. --target=$DBT_TARGET

→ Merge if Slim CI (Job 1) is successful

Job 2: Updating QA Artifacts on QA Branch Merge

  • Download QA dbt artifacts
python manage_dbt_artifacts.py download
  • Recompile and execute only modified nodes and their dependencies within the QA environment
dbt run --select state:modified+ --defer --favor-state --state=. --target=$DBT_TARGET
  • Upload the newly generated manifest.json back to the QA internal stage.
python manage_dbt_artifacts.py load

Additional Considerations:

  • This workflow omits the PR schema cleanup step, which can be implemented using stored procedures or dbt macros

Conclusion

Implementing Slim CI workflow offers several advantages (cf. dbt docs):

  • Provide increased confidence and assurances that project changes will work as expected in production.
  • Reduce the time it takes to push code changes to production, through build and test automation, leading to better business outcomes.
  • Allow organizations to make code changes in a standardized and governed way that ensure code quality without sacrificing speed.

Resources

--

--

Thierno Diallo
Thierno Diallo

Written by Thierno Diallo

Tech Lead Data 🎓. Snowflake Enthusiast 💙 & Certified 🥇. ETL/ELT Expert ⚙️. Data Enthusiast 📈

No responses yet