Dagster: A complete replacement for dbt Cloud automations

Raul Salamanca
7 min readAug 21, 2024

--

A bit of history (it’s important for context regarding the decision, but you can skip this part):

Datalake: A constant challange and evolution

Some time ago, we faced the challenge of setting up a datalake. We started with the classic AWS data stack (Athena, Glue, and S3). We quickly realized that something wasn’t working well and it didn’t entirely suit our needs.

Not long after, we discovered dbt: pure glory. For a very small “team” maintaining the datalake — 2 people directly involved (myself, DevOps, and my colleague, Data Engineer) and 3 people indirectly involved but users nonetheless (data analysts and data scientist), dbt was a paradigm shift: how we do things and how we understand the data we handle. Applying a workflow similar to software development.

To start and not exhaust our capacity in understanding and operating with dbt core, we began with dbt Cloud. Our experience with dbt Cloud was very pleasant, nothing to complain about. It did what it had to do and showed us an enriched visualization that helped us understand how our datalake was structured.

Although the cost of dbt Cloud is not high, especially given the performance improvement of our work, we didn’t find the price that ideal. Sure, paying a fee for the IDE seems fine to us. But paying credits for each materialized model? It didn’t make much sense to us, especially since the computational work (in our case) is handled by BigQuery.

Did we make the right decision?

At this point, we began to question whether dbt Cloud was indeed our best decision. We like dbt and knew that there was an option to use dbt outside of dbt Cloud. Thus, I set out to evaluate which orchestrator would suit our use case and could meet most of the functionalities of dbt Cloud.

Most orchestrators tend to be general-purpose, which is fine as they aim to cover as many use cases as possible. Some are easier to use, like Prefect, while others are more complex to deploy and start operating, like Airflow. We concluded that any orchestrator could approximate the functionalities of dbt in one way or another. After all, with dbt core, it’s about running a “couple of commands” and having the appropriate credentials. But it wasn’t enough.

Dagster: A new hope

When I found Dagster, I introduced it to my colleague and could only describe it in one word: perfect. Obviously, I’m speaking exclusively about our use cases — we operate solely in BigQuery (and a few “legacy” things in AWS).

When I deployed Dagster and started testing, I could only confirm my initial opinion. But still, some things were missing, and most importantly: matching dbt Cloud.

How could we match such an important feature as slim CI?

If you’re not familiar with the term “slim CI,” it means, in short: the materialization of differences between an old state and a current one. For example, when you merge into master, instead of materializing all models, only those that have received some code changes are materialized.

We recognized that this is a pretty important feature. Because if we materialize everything with each merge, we are making abusive use of resources for tables that don’t need to be materialized at that time.

dbt Cloud uses the command dbt build --select state:modified+, which is not completely available in dbt core. This is because it depends on dbt’s metadata, both new and old. In strict terms, the metadata before the changes and the metadata after the changes. dbt Cloud stores this metadata and uses it as a reference for the command in question.

But to replicate the same with dbt core, one should use the command dbt build — select state:modified+ --defer --state {old state path}. It is the same command but manually defining where the old state to be compared is located. In other words, it is our responsibility to store the state. Makes sense.

Saving the state and later retrieving it is not complex. It is simply a JSON file that can be saved anywhere (like in S3 or GCS).

My initial approach was to do this with GitHub Actions, store and then retrieve the state there, and then execute the corresponding dbt commands. But I was uncomfortable not being able to manage this completely with Dagster. Also, not doing it in Dagster means that Dagster lacks relevant information about the materializations.

The definitive solution: Dagster jobs. I don’t want to take all the credit; I found this solution somewhere (I can’t remember where) and refactored it so that the responsibility of saving and retrieving the state is entirely on Dagster, following this flow (practically what dbt Cloud does):

The code adds logic such that if it fails to materialize successfully, the state is not updated.

import os
from pathlib import Path
from typing import Any, Mapping

from dagster import (
DagsterRunStatus,
Nothing,
OpExecutionContext,
Out,
job,
op,
run_status_sensor,
)

from dagster_dbt import (
DbtCliResource,
DagsterDbtTranslator,
DagsterDbtTranslatorSettings,
)

from dagster._utils import file_relative_path

import boto3
import botocore

@op(out=Out(Nothing))
def dbt_slim_ci(context: OpExecutionContext, dbt: DbtCliResource):
tmp_workdir = f"/tmp/dbt-{context.run.run_id}"

os.makedirs(tmp_workdir, exist_ok=True)

manifest = Path(f"{tmp_workdir}/manifest.json")
new_manifest = Path(file_relative_path(__file__, "../target/manifest.json"))

s3 = boto3.resource("s3")
bucket = s3.Bucket("hooli-dagster")

try:
bucket.download_file("dbt/manifest.json", str(manifest))
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
return
else:
raise

dbt_command = [
"build",
"--select",
"state:modified+",
"--defer",
"--state",
f"{tmp_workdir}",
]

yield from dbt.cli(
args=dbt_command,
manifest=new_manifest,
dagster_dbt_translator=DagsterDbtTranslator(
DagsterDbtTranslatorSettings(enable_asset_checks=True)
),
).stream()


@job
def dbt_slim_ci_job():
dbt_slim_ci()


@run_status_sensor(job_selection=[dbt_slim_ci_job], run_status=DagsterRunStatus.SUCCESS)
def dbt_slim_ci_upload_manifest(_):
manifest = Path(file_relative_path(__file__, "../target/manifest.json"))

s3 = boto3.resource("s3")
bucket = s3.Bucket("hooli-dagster")
bucket.upload_file(str(manifest), "dbt/manifest.json")

In a few words and describing the code a bit:

dbt_slim_ci_job is responsible for verifying the existence of a state in the S3 bucket. If the object does not exist, it terminates the execution (with a success state).

dbt_slim_ci_upload_manifestis a sensor. It executes every time dbt_slim_ci_job finishes successfully. It is responsible for uploading the new state to S3.

In conclusion, with this setup, we ensure that the responsibility of materializing the models with differences lies entirely with Dagster. Therefore, we have only one task left: the GitHub trigger.

I will only use the GitHub Actions workflow step. The deployment of Dagster depends exclusively on each implementation; in our case, we did it with Kubernetes and ArgoCD. This step executes after Dagster has been deployed and all the pods are running, to ensure that Dagster already has the changes regarding the state and the job.

- name: Trigger dbt slim CI
env:
DAGSTER_HOST: dagster.sour.sh
DAGSTER_PORT: "443"
DAGSTER_USE_HTTPS: "true"
run: |
python .github/scripts/dbt_slim_ci.py

The code it points to, dbt_slim_ci.py, contains the following code:

import os
import time
from dagster import DagsterRunStatus
from dagster_graphql import DagsterGraphQLClient, DagsterGraphQLClientError

JOB_NAME = "dbt_slim_ci_job"
HOST = os.getenv("DAGSTER_HOST", "localhost")
PORT = int(os.getenv("DAGSTER_PORT", "3000"))
USE_HTTPS = os.getenv("DAGSTER_USE_HTTPS", "false").lower() == "true"

dagster_url = f"http{'s' if USE_HTTPS else ''}://{HOST}{f':{PORT}' if PORT != 80 and PORT != 443 else ''}"

client = DagsterGraphQLClient(HOST, port_number=PORT, use_https=USE_HTTPS)

try:
run_id: str = client.submit_job_execution(
JOB_NAME,
)
except DagsterGraphQLClientError as exc:
raise exc

print(f"Submitted run {run_id} for job {JOB_NAME}")

status = DagsterRunStatus.NOT_STARTED
while status not in [DagsterRunStatus.SUCCESS, DagsterRunStatus.FAILURE]:
try:
status: DagsterRunStatus = client.get_run_status(run_id)
print(f"Run {run_id} for job {JOB_NAME} is {status}")
time.sleep(1)
except DagsterGraphQLClientError as exc:
raise exc

if status == DagsterRunStatus.SUCCESS:
print(f"Run {run_id} for job {JOB_NAME} succeeded")
exit(0)
else:
print(
f"""Run {run_id} for job {JOB_NAME} failed.
You can view the logs at {dagster_url}/runs/{run_id}
"""
)
exit(1)

The previous code can be used as is. It is generic enough to not depend on any of our specific configurations.

It is quite simple, even though it may be cumbersome: it triggers Dagster to run the job we created earlier, dbt_slim_ci_job, and then waits to determine the execution status so that GitHub Actions can fail if the job does not execute correctly.

Final words

With this, we were able to completely match the automation functionalities of dbt Cloud, and even enhance them with the additional features that Dagster offers as an orchestrator (we hope to integrate other tools like Airbyte in the future).

What we do not question, and for which we will surely see alternatives at some point, is the stage of model development. The IDE of dbt Cloud is good and serves its purpose well. Additionally, being web-based, it is “portable,” and we can assume a cost that can remain constant.

And something quite important: an essential feature is the feature previews of dbt Cloud. It is quite advantageous to have access to datasets that have changes every time a Pull Request is created — not that alternatives do not exist, but it is convenient, it works, and it’s a cost that we can maintain constantly for a long time.

Therefore, our workflow looks like this:

It is a workflow where each part does its job well and is also quite cost-effective. With Dagster, we only have to pay for its computation, which can remain constant for a long time since we currently use it with dbt. And in truth, the actual computational work is handled by BigQuery.

Costs will no longer impact us as immediately based on the materializations we perform, nor will we have to make efforts to adhere to a tight budget due to this limitation.

--

--

Raul Salamanca

Like The Powerpuff Girls. A bit of everything but excess of anxiety.