Dagster + Airbyte + dbt: How Software-Defined Assets Change the Way We Orchestrate
A few weeks ago, I heard an interview with Dagster founder Nick Schrock on the Data Engineering Podcast. Although I tried Dagster way back in its early stage, listening to Nick’s approach to Software-Defined Assets (SDA) in the data orchestration field intrigued me enough to give the new, 1.0 version a try. Why we should care, how does it compare to Airflow, and why will I be using only Dagster for my Airbyte and dbt jobs — you will know more if you continue reading.
Assets. Before we discuss what exactly these Software-Defined Assets are in the ETL scheduling/orchestration space, let’s see how things were done in the past. Your code was stored in different locations (ingestion tools, dbt models, Python notebooks, etc.) and your orchestration tool of choice executed or invoked these objects in “tasks.” These tasks were organized into graphs: dependencies and relationships to say how they should be executed relative to each another.
In your orchestration tool, you had full visibility over task executions, but on the other hand, you had no or little overview of the created/refreshed objects made by your tasks. However, in the data world, this is what really matters. When did you refresh a table or a table partition, what other tables should be refreshed if an upstream table changes, how many rows are loaded: just a few examples of questions you want to answer, regardless of what underlying tool or technology loads that asset.
This is where Dagster differentiates itself. Instead of talking about Tasks and Graphs, you have Assets and Materializations. According to Dagster’s documentation:
- An asset is an object in persistent storage, such as a table, file or persisted machine learning model.
- A software-defined asset is a Dagster object that couples an asset to the function and upstream assets that are used to produce its contents. In practice, you define what you materialize and how.
Like in other tools (such as Airflow), the definition of “what” and “how” is defined in Python code. Therefore, you can generate assets with reusable code: libraries to sync metadata from other tools to build up dependency graphs and functions to materialize them. Think about a single function that produces Software-Defined Assets for all of your dbt models, defining the underlying tables, their metadata, the SQL code behind the code and the dbt command to invoke for the model run.
Software-Defined Assets enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed.
Let’s see this in practice.
Installing Dagster Locally
Running Dagster and its graphical UI (Dagit) is pretty easy. First, you have to install the core modules and the desired plugins with
pip install dagster dagit dagster-dbt dagster-airbyte dagster-snowflake dagster-postgres
However, before you can see your assets, lineage or operations, you first need a repository with some stuff.
Adding Airbyte Extracts and Load (EL) Connection to Dagster
There are two ways to import our Airbyte connections. One is to use
build_airbyte_assets, which connects to a remote Airbyte instance, or you can use
load_assets_from_airbyte_project, which relies on a local Octavia CLI export. For simplicity, I used the
build_airbyte_asset version, which does not require any external dependency (other than installing
dagster-airbyte package). However, this version requires us to specify all tables in our connection:
What happens above is:
- We define a resource object (in this case an Airbyte resource). Resource objects contain metadata (connection information in most cases), used by repository discovery or materialization operations. Unfortunately, you still have to pass these configuration items when you trigger jobs manually on Dagit (this is something I don’t like).
- We generate our assets using
build_airbyte_assetsfunction. In my scenario, the table names are hard-coded, but they can be stored in YAML files too (or I could use
- We define our repository. All I have to do is to return my assets (or other objects) defined from a function decorated with
Now we can finally start Dagit and see what we did:
dagit -f dagster_airbyte_example.py
I can try to materialize these tables by shift+clicking on “Materialize all.” We need shift+click because we have to add runtime configuration for the materialization.
A convenient “Scaffold missing config” button will show you the mandatory configuration items. If all looks good, you should be able to start Materializing your Airbyte connection:
It’s all nice and dandy, except I always have to provide the resource configuration for manual executions. Even if I hard-coded the resource config to my
airbyte.py file, I had to specify it again. This is not needed when executing from schedules but was annoying enough to mention it here.
In the end, I can see when my assets were last refreshed:
Looks good, so let’s set up dbt as well.
Importing a dbt Project to Dagster
This will be easy, too. Just like with Airbyte, we need an asset loader helper function:
load_assets_from_dbt_project in our case.
from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from dagster import with_resourcesdbt_assets = with_resources(
By default, Dagster only invokes
dbt run, which excludes seeds and snapshots. If you want to include those assets as well, add the following parameter to
That’s it. Let’s put it alltogether with the Airbyte code (into the same Python file) to see our Airbyte source tables and dbt models together.
This is good, but not what we expected. It would be great to see our Airbyte tables and dbt tables connected as dependencies. To understand why, let’s look into the assets tab:
We can see two versions of the
src_contacts table. One is under
/ and the other is under
wrike. The issue here is that we need to define where these tables are and make their schema part of their asset keys. In my case, all Airbyte tables should go under
stage/source_name schema, while all models should go under
practice schema. I can quickly change the code to specify these schemas as part of the asset keys:
dbt_assets = with_resources(
)ab_assets = with_resources(
This time, if I check my lineage on Dagit, dependencies are looking more decent:
Trust me, this is awesome. Seeing your end-to-end data pipeline across multiple tools with such simplicity is game-changing.
This is just one differentiating feature of Dagster; you can still expect all the orchestration must-haves like job definitions (sets of assets and operations to execute), ops (traditional tasks), schedules, sensors and a lot more. Dagster also nicely supports Kubernetes deployments — its Helm package really just works out-of-the-box.
One thing I should highlight here is the authentication and authorization part. Just like Airbyte, Dagster does not support user authentication and authorization in its open-source version. This is not a huge issue for us: if you deploy it in EKS or GKS, it’s easy to configure an authentication frontend load balancer to filter access to the main Dagit web service. Dagit also supports read-only and read-write modes, so you can run two separate Dagit pods with two configurations and redirect authenticated user traffic according to their access level (RW or RO).
While Dagster is still in an early phase, I already enjoy working with it in production internally and at our clients’ deployments. Maybe you too should give it a try.
REACH OUT TO STARSCHEMA HERE:
READ MORE FROM STARSCHEMA:
Extending Airbyte: Creating a Source Connector for Wrike
Find out if Airbyte’s claims of outstanding extensibility hold up under scrutiny when building a new connector from…
Fivetran Acquires HVR: You’re in for a Treat
Find out why Fivetran’s acquisition of HVR with is great news for organizations looking to optimize their data…