Slim CI with dbt Core and Snowpark
I have recently enhanced my project with a valuable addition: a Slim CI pipeline. This job automates testing and validation of all Pull Requests (PRs) before merging them into production.
While implement a CI might seem straightforward, a Slim CI using dbt Core requires an extra effort compared to dbt Cloud, which has built-in Slim CI functionality.
Slim CI
Before talking about Slim CI, what is CI ?
CI stands for Continuous Integration. It’s a software development practice where developers frequently merge their code changes into a central repository. This is often paired with Continuous Delivery (CD) to automate the deployment process (CI/CD).
On the other hand, Slim CI is a feature specific to dbt that optimizes the CI process for data models. Indeed it allows to only run, tests and build models that are affected by recent changes, making the CI more efficient and cost-effective. While Slim CI is readily available in dbt Cloud, dbt Core users need to implement it themselves.
The core principales remain the same:
- Tracking dbt executions artifacts (manifest.json)
- Identifying modified nodes between two dbt executions
- Building only modified nodes and their downstream dependencies
This ensures only impacted parts of the project are rebuilt on each pull request, allowing efficient testing without breaking existing functionality.
Slim CI with dbt Core
As described above the Slim CI process can be divided into three steps.
Step 1: Tracking dbt executions artifacts (manifest.json)
This is the most challenging part with dbt core. In fact, this step involves capturing and saving the manifest.json
file.
Manifest or more precisely manifest.json is a dbt artifact generated at each compilation or execution of your project. The file is by default saved inside dbt /target
directory and is not versioned.
This single file (manifest.json) contains a full representation of your dbt project’s resources (models, tests, macros, etc), including all node configurations and resource properties. It is used to populate the docs site, and to perform state comparison.
While manifest.json
has various uses, its significance in Slim CI lies in enabling state comparison, which we’ll cover in later step. To leverage this effectively, I needed to capture and access manifest.json. Since, my technical stack consists of dbt Core with Snowflake, a potential solution involves using Snowflake internal stages.
Manage artifacts with Snowpark
Snowpark is Snowflake library for querying and processing data in Snowflake using common development languages: Java, Python and Scala. For my project I’m using Snowpark Python API.
Project structure
my_project
|___ snowflake/
|___|___ src/
|___|___|___ manage_dbt_artifacts.py
|___| transformation/ (dbt project)
|___|___ target/
|___|___|___ manifest.json
To manage manifest.json
within my Snowflake environment, I’ve implemented a Python script manage_dbt_artifacts.py
that utilizes Snowpark API. This script performs the following tasks:
- Connects to Snowflake: Establishes a secure connection to the appropriate Snowflake environment.
- Manages Internal Stage: Creates and interacts with an internal Snowflake stage named
"dbt_artifacts"
for storing themanifest.json
file - Handles File Transfer: Accepts command-line arguments specifying the operation type (
load
ordownload
). - Load: Retrieves the
manifest.json
file from the designated location (transformation/target/
) and uploads it to the internal stage. - Download: Retrieves the
manifest.json
file from the internal stage and saves it to the specified location (transformation/
).
Check the code below for more details:
# manage_dbt_artifacts.py
import sys
import os
import snowflake.snowpark as snowpark
from helpers import connection_utils
import logging
def main():
# Set up logging configuration
logging.basicConfig(level=logging.INFO)
# Check command-line arguments
if len(sys.argv) != 2:
logging.info("Usage: dbt_artifacts.py type")
sys.exit(1)
# Establish Snowflake session
session = connection_utils.get_session()
type = sys.argv[1]
artifacts_stage = "@tech.dbt_artifacts"
# Determine file path
script_dir = os.path.dirname(os.path.abspath(__file__))
dbt_project_dir = os.path.normpath(os.path.join(script_dir, '../../../transformation'))
file_path = os.path.join(dbt_project_dir, 'target/manifest.json')
# Perform action based on command-line argument
if type == 'load':
load_artifacts(session, artifacts_stage, file_path)
elif type == 'download':
unload_artifacts(session, artifacts_stage, dbt_project_dir)
else:
logging.error("Usage: dbt_artifacts.py type")
raise ValueError("Incorrect value for type")
session.close()
return 'SUCCESS'
def load_artifacts(session: snowpark.Session, stage: str, file_path: str, ) -> None:
try:
_ = session.sql(f"create stage if not exists {stage[1:]} file_format = (type='JSON')").collect()
logging.info(f"{stage} created successfully")
_ = session.file.put(file_path, stage, auto_compress=False, overwrite=True)
logging.info(f"{file_path} loaded successfully into {stage}")
except snowpark.exceptions.SnowparkSQLException as err:
logging.error(f"Unexpected {err=}, {type(err)=}")
raise
else:
res = session.sql(f"ls {stage}").collect()
logging.info(res)
def unload_artifacts(session: snowpark.Session, stage: str, target_directory: str) -> None:
try:
_ = session.file.get(stage, target_directory)
logging.info(f"Files unloaded successfully into {target_directory}")
except Exception as err:
logging.error(f"Unexpected {err=}, {type(err)=}")
raise
if __name__ == "__main__":
main()
Step 2: Identifying modified nodes between two dbt executions
The second step involves determining which parts of dbt project have been modified since the last execution. This is achieved by leveraging dbt’s built-in state comparison functionality.
In fact, dbt maintains a snapshot of your project’s state at each execution, captured in the manifest.json
file. By utilizing the state
method, we can compare this current state with a previous one and identify the specific nodes (models, tests, etc.) that have been modified.
dbt build --models state:modified+ --state=.
Above command look for modified nodes between manifest.json
at current folder --state=.
and the one that will be computed into/target
directory. Including the selection operator (+) allows to execute downstream nodes.
Step 3: Building only modified nodes and their downstream dependencies
Now that we can load and unload dbt manifest.json
(Step 1), compare and retrieve modified nodes between different manifest.json
(Step 2), the final step involves efficiently building only the recent changes.
Here’s the challenge: building a node in dbt requires its upstream dependencies to be build first. This is where we will leveraging a feature of dbt called “Defer”.
Defer is a powerful feature that makes it possible to run a subset of models or tests in a sandbox environment without having to first build their upstream parents. Defer requires that a manifest from a previous dbt invocation be passed to the
--state
flag or env var.
By combining --defer
with state comparison (--state
), we can leverage existing upstream nodes from a separate environment (like QA or production) to build only the recently modified nodes and their downstream dependencies within the CI pipeline.
dbt build --select state:modified+ --defer --favor-state --state=.
The --favor-state
flag ensures that we can build all objects in a separate environment (e.g., _PR_XX_
), even if their parent nodes are not present there
Create a dedicated testing environment
To truly mimic dbt Cloud’s Slim CI behavior, we need to build the modified nodes and their dependencies in a separate environment within our Snowflake database. This ensures isolation and prevents conflicts with existing objects (cf. Achieve Conflicts-Free PRs using dbt Core Custom Schemas ).
One approach involves updating dbt’s custom schema generation macro generate_schema_name_for_env
to use an environment variable (DBT_CUSTOM_SCHEMA
) that gets populated during every CI pipeline execution. The variable might hold a value like PR_[PR_NUMBER]
to create a dedicated schema for each PR.
-- generate_schema_name_for_env
{% macro generate_schema_name_for_env(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- set env_custom_schema_name = env_var("DBT_CUSTOM_SCHEMA", "") | trim -%}
{%- if env_custom_schema_name != "" -%}
{% if custom_schema_name is none -%}
_{{ env_custom_schema_name }}_{{ default_schema }}
{% else %}
_{{ env_custom_schema_name }}_{{ custom_schema_name | trim }}
{% endif %}
{%- elif custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
Implementing the Slim CI workflow
The final part consists in integrating the previously described actions into a CI tool (like Github Actions, Jenkins, Gitlab CI/CD etc.). This CI pipeline will orchestrate the entire Slim CI process.
Let’s assume you use an indirect promotion model with a dedicated QA branch before merging to production. Here’s a possible execution plan:
Prerequisites:
The workflow suppose an initialmanifest.json
file representing the current state of your QA environment within your Snowflake internal stage.
Job 1: Testing on PR Merge to QA Branch
- Download QA dbt artifacts from @internal_stage to /transformation:
python manage_dbt_artifacts.py download
- Identify modified nodes and build them along with their dependencies within a separate
PR_[PR_NUMBER]
environment
dbt build --select state:modified+ --defer --favor-state --state=. --target=$DBT_TARGET
→ Merge if Slim CI (Job 1) is successful
Job 2: Updating QA Artifacts on QA Branch Merge
- Download QA dbt artifacts
python manage_dbt_artifacts.py download
- Recompile and execute only modified nodes and their dependencies within the QA environment
dbt run --select state:modified+ --defer --favor-state --state=. --target=$DBT_TARGET
- Upload the newly generated
manifest.json
back to the QA internal stage.
python manage_dbt_artifacts.py load
Additional Considerations:
- This workflow omits the PR schema cleanup step, which can be implemented using stored procedures or dbt macros
Conclusion
Implementing Slim CI workflow offers several advantages (cf. dbt docs):
- Provide increased confidence and assurances that project changes will work as expected in production.
- Reduce the time it takes to push code changes to production, through build and test automation, leading to better business outcomes.
- Allow organizations to make code changes in a standardized and governed way that ensure code quality without sacrificing speed.