Starschema Blog
Published in

Starschema Blog

Dagster + Airbyte + dbt: How Software-Defined Assets Change the Way We Orchestrate

A few weeks ago, I heard an interview with Dagster founder Nick Schrock on the Data Engineering Podcast. Although I tried Dagster way back in its early stage, listening to Nick’s approach to Software-Defined Assets (SDA) in the data orchestration field intrigued me enough to give the new, 1.0 version a try. Why we should care, how does it compare to Airflow, and why will I be using only Dagster for my Airbyte and dbt jobs — you will know more if you continue reading.

Software-Defined What?

Assets. Before we discuss what exactly these Software-Defined Assets are in the ETL scheduling/orchestration space, let’s see how things were done in the past. Your code was stored in different locations (ingestion tools, dbt models, Python notebooks, etc.) and your orchestration tool of choice executed or invoked these objects in “tasks.” These tasks were organized into graphs: dependencies and relationships to say how they should be executed relative to each another.

In your orchestration tool, you had full visibility over task executions, but on the other hand, you had no or little overview of the created/refreshed objects made by your tasks. However, in the data world, this is what really matters. When did you refresh a table or a table partition, what other tables should be refreshed if an upstream table changes, how many rows are loaded: just a few examples of questions you want to answer, regardless of what underlying tool or technology loads that asset.

Airflow DAG chaos. While you can see the tasks and dependencies, you probably have no clue about the status of the tables behind the tasks. (Credit: Apache Airflow)

This is where Dagster differentiates itself. Instead of talking about Tasks and Graphs, you have Assets and Materializations. According to Dagster’s documentation:

  • An asset is an object in persistent storage, such as a table, file or persisted machine learning model.
  • A software-defined asset is a Dagster object that couples an asset to the function and upstream assets that are used to produce its contents. In practice, you define what you materialize and how.

Like in other tools (such as Airflow), the definition of “what” and “how” is defined in Python code. Therefore, you can generate assets with reusable code: libraries to sync metadata from other tools to build up dependency graphs and functions to materialize them. Think about a single function that produces Software-Defined Assets for all of your dbt models, defining the underlying tables, their metadata, the SQL code behind the code and the dbt command to invoke for the model run.

An example set of Assets, viewed by the dependencies across different tools and applications
Example Asset. The definition and lineage information was added by the Dagster-dbt library.

In short:

Software-Defined Assets enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed.

Let’s see this in practice.

Installing Dagster Locally

Running Dagster and its graphical UI (Dagit) is pretty easy. First, you have to install the core modules and the desired plugins with pip:

pip install dagster dagit dagster-dbt dagster-airbyte dagster-snowflake dagster-postgres

However, before you can see your assets, lineage or operations, you first need a repository with some stuff.

Adding Airbyte Extracts and Load (EL) Connection to Dagster

There are two ways to import our Airbyte connections. One is to use build_airbyte_assets, which connects to a remote Airbyte instance, or you can use load_assets_from_airbyte_project, which relies on a local Octavia CLI export. For simplicity, I used the build_airbyte_asset version, which does not require any external dependency (other than installing dagster-airbyte package). However, this version requires us to specify all tables in our connection:

What happens above is:

  1. We define a resource object (in this case an Airbyte resource). Resource objects contain metadata (connection information in most cases), used by repository discovery or materialization operations. Unfortunately, you still have to pass these configuration items when you trigger jobs manually on Dagit (this is something I don’t like).
  2. We generate our assets using build_airbyte_assets function. In my scenario, the table names are hard-coded, but they can be stored in YAML files too (or I could use load_assets_from_airbyte_project).
  3. We define our repository. All I have to do is to return my assets (or other objects) defined from a function decorated with @repository.

Now we can finally start Dagit and see what we did:

dagit -f dagster_airbyte_example.py
My first two tables! How cool is this.

I can try to materialize these tables by shift+clicking on “Materialize all.” We need shift+click because we have to add runtime configuration for the materialization.

Materialization configuration page. You can add per execution configurations from Dagit.

A convenient “Scaffold missing config” button will show you the mandatory configuration items. If all looks good, you should be able to start Materializing your Airbyte connection:

The end-to-end flow of running executing materialization.

It’s all nice and dandy, except I always have to provide the resource configuration for manual executions. Even if I hard-coded the resource config to my airbyte.py file, I had to specify it again. This is not needed when executing from schedules but was annoying enough to mention it here.

In the end, I can see when my assets were last refreshed:

You can access Airbyte logs directly from Dagster.

Looks good, so let’s set up dbt as well.

Importing a dbt Project to Dagster

This will be easy, too. Just like with Airbyte, we need an asset loader helper function: load_assets_from_dbt_project in our case.

from dagster_dbt import dbt_cli_resource, load_assets_from_dbt_project
from dagster import with_resources
dbt_assets = with_resources(
load_assets_from_dbt_project(
project_dir="path/to/your/dbt/project"
),
{"dbt": dbt_cli_resource},
)

By default, Dagster only invokes dbt run, which excludes seeds and snapshots. If you want to include those assets as well, add the following parameter to load_assets_from_dbt_project:

use_build_command=True,

That’s it. Let’s put it alltogether with the Airbyte code (into the same Python file) to see our Airbyte source tables and dbt models together.

Finally, we can see our Airbyte and dbt models together. However, they are not connected.

This is good, but not what we expected. It would be great to see our Airbyte tables and dbt tables connected as dependencies. To understand why, let’s look into the assets tab:

We can see two versions of the src_contacts table. One is under / and the other is under wrike. The issue here is that we need to define where these tables are and make their schema part of their asset keys. In my case, all Airbyte tables should go under stage/source_name schema, while all models should go under practice schema. I can quickly change the code to specify these schemas as part of the asset keys:

dbt_assets = with_resources(
project_dir=load_assets_from_dbt_project(
key_prefix=["practice"],
source_key_prefix=["stage"],

),
{"dbt": dbt_cli_resource},
)
ab_assets = with_resources(
build_airbyte_assets(
connection_id="connection_id",
destination_tables=["table1"],
asset_key_prefix=["stage", "wrike"],
),
{"airbyte": airbyte_resource},
)

This time, if I check my lineage on Dagit, dependencies are looking more decent:

Dependencies between Airbyte assets and dbt assets are looking cool.

Trust me, this is awesome. Seeing your end-to-end data pipeline across multiple tools with such simplicity is game-changing.

Next Steps

This is just one differentiating feature of Dagster; you can still expect all the orchestration must-haves like job definitions (sets of assets and operations to execute), ops (traditional tasks), schedules, sensors and a lot more. Dagster also nicely supports Kubernetes deployments — its Helm package really just works out-of-the-box.

User Security

One thing I should highlight here is the authentication and authorization part. Just like Airbyte, Dagster does not support user authentication and authorization in its open-source version. This is not a huge issue for us: if you deploy it in EKS or GKS, it’s easy to configure an authentication frontend load balancer to filter access to the main Dagit web service. Dagit also supports read-only and read-write modes, so you can run two separate Dagit pods with two configurations and redirect authenticated user traffic according to their access level (RW or RO).

While Dagster is still in an early phase, I already enjoy working with it in production internally and at our clients’ deployments. Maybe you too should give it a try.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Tamas Foldi

Tamas Foldi

205 Followers

Helping enterprises to become more data driven @ HCLTech, co-founder & former CEO @ Starschema