Who Let the DAGs out? Getting the DAG out of dbt (Chapter 2)

Tim Leonard
10 min readMar 7, 2023

--

This series covers the pairing of dbt’s powerful capabilities for building and compiling complex SQL-based workflows, with Flyte, which offers an equally rich set of features for workflow orchestration.

If you haven’t already, please see Chapter 1: motivation

There are 3 chapters in this series so far:

Disclaimers

  • The thoughts and opinions in this article are my own and not that of my employer.
  • My employer is a major contributor to the Flyte project.
  • Portions of the article text have been written with the assistance of ChatGPT with the author acting as reviewer and editor.

Keywords

  • dbt
  • Flyte
  • SQL
  • data transformations
  • DAG
  • workflow orchestration
  • Data Processing
  • Task Scheduling
  • Task Execution

Chapter 2: Getting the Execution Plan/DAG out of dbt

In chapter 1 we talked about what is DAG is, why we’d want to extract it from dbt and use Flyte for workflow orchestration instead of dbt. Next we’ll work through an example of extracting the dbt DAG/Execution plan fron dbt.

A Note on Model Selection

Let’s learn a bit more about how dbt makes it’s DAG, and how we can export it.

An obvious entry point to extract the DAG would be the dbt list command. The dbt list command is used to display a list of available dbt resources (models, tests, and macros) in a project. When you run dbt list, it will print out a table that shows the names and descriptions of all available resources in your dbt project. It will return the results based on --models/--select input, but does not order the models in the same way as they would run during execution. The dbt list command does not output an execution plan, but it can be useful to see a summary of all available resources in your project. For example, dbt list --select model_a+ looks like this:

dbt list output

The next alternative is to dig into the dbt core code, find where the execution plan is generated, and extract it for our needs.

In dbt, the run command is used to execute all the models in a project. However, it also supports powerful model selection syntax that allows you to run a subset of models based on a variety of criteria.

In our setup, our dbt project is so large we use the selection syntax to run parts of the project at a time. Typically we divide up a set of models by tags, and run the tags. E.g. dbt run — select tag:consumption_metrics

--select/ --models flags are interchangeable (for now), though dbt has moved to --select as main syntax

But we also use the full suite of selection syntax, and so it’s required that we support dbt’s selection syntax and build a DAG only for the models explicitly included in the run command.

Our dbt Project

For these examples, we are assuming that a working dbt project is in place. If you are getting started with dbt, you can standup and configure a demo project with dbt init. We are also assuming that the profiles.yml, holding the database config for thai project, is in your project folder (the currently recommended setup from dbt labs).

In these examples we are using a demonstration project I’ve set up with a slightly more elaborate DAG than Jaffle shop (a classic dbt demo project).

Here’s a view of that demo project in dbt docs (dbt docs serve)

dbt demo project as seen in the dbt docs interface

Exporting the DAG

In order to fully respect the full range of dbt selection syntax, we’ll use dbt’s code to parse our run command, compile the project manifest (including all nodes) and graph, and finally apply the selection statement to filter the nodes and graphs. dbt core is in python, so we’ll use python to pull these commands out and run them in order to access the parts we need.

So as setup for the next part, we’ll want to:

  1. load the parser from dbt and run a command through it to create a dbt run task.
  2. load and compile manifest — dbt will parse the project, create the manifest (task.manifest), and create graph object for the models in the project (task.graph).
  3. do a bit of unpacking:
    - extract the selection spec object from the task
    - extract the selected nodes from the selection spec
    - extract the graph for the selected nodes

Here’s how that looks.

For this example we are using:

Python 3.9.6
dbt 1.3.2
NetworkX 2.8.8

Additionally, the dbt project is setup as described above, with profiles.yml in the project folder.

The following code assumes that you are running it in python within the root of the dbt project, however this can be easily changed by modifying the--project-dir and/or--profiles-dir settings passed to dbt via the args.

# Setup imports
import dbt.main
from dbt.main import parse_args, run_from_args
from dbt import flags
from dbt.config.profile import read_user_config
from dbt.tracking import User, active_user
# Setup user (read by next fns)
global active_user
active_user = User("/")
active_user.initialize()
active_user.disable_tracking()
# Setup commands args
# this would be same as calling dbt with:
# dbt run --select model_a+ --project-dir . --profiles-dir .
args = ["run"
,"--select"
,"model_a+"
,"--project-dir","."
,"--profiles-dir","."]
# Notably: for this project, we are selecting model_a, and
# all models that are downstream of model_a with the `model_a+` syntax
# We are issuing a list cmd
# Parse command args
parsed = parse_args(args)
user_config = read_user_config(flags.PROFILES_DIR) # This is read again later
flags.set_from_args(parsed, user_config)
dbt.tracking.initialize_from_flags()
# generate task
task = parsed.cls.from_args(args=parsed)
# load and compile the manifest
task.load_manifest()
task.compile_manifest()
# we now have access to `task.graph` and `task.manifest`

Selected nodes/graph

With our full manifest and full graph in place, we now need to move through dbt datatypes to filter these down to only the nodes and graph that were selected in the run command. That is, we want to make sure model_a2 is excluded, just as we see in dbt docs with the--model_a+ select command.

Project DAG with — select model_a+
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
# passing task.graph and task.manifest to build the selector
selector = ResourceTypeSelector(
graph=task.graph,
manifest=task.manifest,
previous_state=task.previous_state,
resource_types=[NodeType.Model],
)
# setup spec
spec = task.get_selection_spec()
# pass spec to the selector to refine the nodes
selected_nodes = selector.get_selected(spec)
selected_graph = selector.full_graph.get_subset_graph(selected_nodes)

Let’s take a look at what we have so far. dbt uses NetworkX python package for it’s low level graph objects. selected_graph.graph is a NetworkX DiGraph (networkx.classes.digraph.DiGraph). That means we can use NetworkX to interact with dbt graphs. First, here I’m ‌using networkx.draw() to plot selected_graph.graph.

We indeed have a DAG: contains directed edges, contains no cycles or loops.

If we were to run this command in dbt, this is the execution plan it would follow, iterating through each model from top to bottom.

But there’s a catch. model_e is ephemeral, and doesn’t contain any task to run. dbt still encodes it in it’s graph, and task queue (more on that later).

Ephemeral models in dbt (data build tool) are a type of model that are not persisted in the database. They are created on-the-fly during the execution of a dbt project and exist only in memory for the duration of the dbt run.

Ephemeral models are useful for situations where you want to join, aggregate, or transform data from multiple tables or views, but you don’t want to create a permanent table to store the results. Instead, you can define an ephemeral model that performs the necessary calculations and then use that model in downstream models or analyses.

Though dbt preserves the ephemeral models in it’s internal graph and queue structures, odds are we want to omit them when publishing our DAG to an external workflow orchestration system like Flyte. Pragmatically, the SQL from the ephemeral models are pushed into the later materialized ones (e.g. model_f references model_e, and so model_e SQL is inserted into model_f as a CTE), and don’t actually constitute a step we’d want to replicate externally. We could push these tasks to Flyte, and have nothing happen, but I think that overly complicates debugging long term. So let’s take them out.

DAG for dbt run — select model_a+; ephemeral model (model_e) in red

Modify the DAG to Remove Ephemeral Nodes

The next step is to modify this graph to remove the ephemeral node (model_e; in red).

For that we can lean on the NetworkX package, since hey, we already have a NetworkX graph (thanks dbt!). We can use the contracted_edge function in NetworkX. It is used to merge two nodes in a graph into a single node. We will iterate through each ephemeral node, and ‘squeeze’ it out by contracting the edges. This way we can be sure we are preserving the connections on each side of the node we are removing. I am not a graph specialist, and there is likely a much more concise way to do this!

import networkx as nx
ephemeral_nodes = [x for x in selected_nodes if task.manifest.nodes[x].is_ephemeral]

selected_graph_mod = selected_graph.graph
# remove ephemeral tasks
for eph in ephemeral_nodes:
edges = selected_graph_mod.in_edges(eph)
if len(edges):
print(f'remove {edges}')
selected_graph_mod = nx.contracted_edge(
selected_graph_mod
, tuple(edges)[0]
, self_loops = False)

After which we now have a DAG of tasks that looks like this:

DAG for dbt run — select model_a+ with ephemeral model (model_e) removed

Model_3 is removed, and the connections from a -> h,g,f are preserved. Awesome!

Please note, this graph is not perfect and there is a connection between model_a to model_c2, and model_c to model_c2 that is not totally obvious.

Export the Execution Plan from the Graph

dbt uses a method based on topological sort to decide in what order/grouping jobs should be performed. It uses a modified version of topological sort to assign a score, based on level, to each model. These functions are in the GraphQueue class (dbt.graph.queue).
We can expose some of that, by creating a GraphQueue object, to take a deeper look at the GraphQueue._get_scores() outputs:

from dbt.graph.queue import GraphQueue
GQ = GraphQueue(selected_graph.graph, task.manifest, selected_nodes)scores = GQ._get_scores(selected_graph.graph)

And here are the scores we get out:

{'model.my_new_package.model_a': 0,
'model.my_new_package.model_b': 1,
'model.my_new_package.model_f': 1,
'model.my_new_package.model_g': 1,
'model.my_new_package.model_h': 1,
'model.my_new_package.model_c': 2,
'model.my_new_package.model_c2': 3}

As expected, we are seeing the DAG chopped up into layers, each corresponding to distinct group of executions that dbt would perform when running the dag.

dbt is doing a bit more when it comes time to execute. It sets up the scores, builds an ordered queue, and then starts performing the tasks. Unfortunately, dbt does not expose any of the logic for iterating through this queue, so I’ve reproduced a stripped down version here, with a few additions. We are going to emulate a bare bones version of dbt executing a run, but at each step only viewing the queue objects and pulling out some helpful information to populate our execution plan for output.
We’ll have dbt setup the queue (along with scoring and sorting the tasks), then iterate through the queue, marking the tasks completed one by one (just like dbt does when running a task).

I’ve submitted a feature request to make this functionality native to dbt-core here and am planning to contribute in the coming months.

queue = selector.get_graph_queue(spec)
execution_plan = []
# swap in modified graph, hacky until graph queue fns are better exposed
queue.graph = selected_graph_mod.copy()
while not queue.empty():
node = queue.get()

# do new lookup to get refs on modified graph
refs_mod_id = list(selected_graph_mod.predecessors(node.unique_id))
refs_mod_name = [queue.manifest.nodes.get(x).name for x in refs_mod_id]

d = {'id' : node.unique_id
, 'score' : queue._scores[node.unique_id]
, 'name' : node.name
, 'refs_mod' : refs_mod_name
, 'is_ephemeral_model' : node.is_ephemeral_model}
queue.mark_done(node.unique_id)
execution_plan.append(d)

The astute reader will note that we are providing somewhat redundant information. The score is an abstraction of the information contained in refs_mod (model references, modified graph, which excludes ephemeral). Spoiler alert: we can reproduce the DAG in Flyte with only the ‘refs_mod’ information and won’t need a score. But I’ve included it here, because it gives some insight into how dbt organizes the runs of it’s DAGs internally, and the score may still be useful for other implementations beyond the one in the next section.

execution_plan
[{'id': 'model.my_new_package.model_a',
'score': 0,
'name': 'model_a',
'refs_mod': [],
'is_ephemeral_model': False},
{'id': 'model.my_new_package.model_b',
'score': 1,
'name': 'model_b',
'refs_mod': ['model_a'],
'is_ephemeral_model': False},
{'id': 'model.my_new_package.model_c',
'score': 2,
'name': 'model_c',
'refs_mod': ['model_b'],
'is_ephemeral_model': False},
{'id': 'model.my_new_package.model_f',
'score': 2,
'name': 'model_f',
'refs_mod': ['model_a'],
'is_ephemeral_model': False},
{'id': 'model.my_new_package.model_g',
'score': 2,
'name': 'model_g',
'refs_mod': ['model_a'],
'is_ephemeral_model': False},
{'id': 'model.my_new_package.model_h',
'score': 2,
'name': 'model_h',
'refs_mod': ['model_a'],
'is_ephemeral_model': False},
{'id': 'model.my_new_package.model_c2',
'score': 3,
'name': 'model_c2',
'refs_mod': ['model_a', 'model_c'],
'is_ephemeral_model': False}]

It doesn’t look like much, but this is a very powerful chunk of information indeed.

We now have a general way to convert a dbt command (e.g. run --select model_a+ into a deterministic execution plan, telling us in what order to run each model to ensure the parents of any given model are run first.

This execution plan, paired with the contents of the dbt target/compiled folder, means that we now have compiled SQL for each node, and the order in which they should be run, all based on our initial run --select model_a+ command.

From here, one could implement in many different workflow orchestration tools. And this first part could be an article in and of itself! But we aren’t stopping here, next, we will take this execution plan and register it in Flyte.

What’s Next

(aka What Didn’t we Cover, Even After 3 Chapters and 6000 Words)

  • Additional setup guide for this demo: hosting a postgres db locally via docker image and connecting to it from flytectl cluster. Quite the powerful local testing paradigm.
  • Full demo, a simple script in your Flyte project that automatically converts a list of dbt run commands into distinct Flyte workflows. Thanks to ImparativeWorkflows you can have full Flyte dbt DAGs without writing a single line Flyte!
  • Proof of concept for inserting additional Flyte tasks (non-dbt) into the dbt DAG! In this way you can run arbritary code within any point of the dbt DAG.
  • Handling ContainerTasks that are shared across multiple workflows (we don’t want to register the same task twice!).
    Handling input arguments into the Flyte task, and passing through to dbt commands.
  • Handling/passing environment variables through to dbt commands.
  • And maybe more!

Thanks for reading! I hope you had fun I know I did!

--

--

Tim Leonard

Classically trained Data Scientist living at the intersection of Data Science, Analytics Engineering and Data Engineering.