Graceful interruption of dbt with Dagster on Kubernetes

Brian Pohl
11 min readDec 1, 2022

--

I love working for startups because I have the freedom to build from scratch using the latest technology. At Plunk, we’re doing just that — leveraging emerging tech to build the first real-time home analytics platform. In our data pipeline, we’ve got dbt for data transformation, Snowflake for our data lakehouse, and Dagster for job orchestration. But using new tech is a double-edged sword. When you discover an issue, you just might be the first one to have to fix it.

As the Principal Data Engineer, I’m the lucky one who plays with all the shiny new data toys. Recently, I set up Dagster and got it to run all our previously-disjointed ETL steps. For a day it was glorious. But the next day I got an error:

Table 'SALES' already exists, but current role has no privileges on it.

Excuse me? I spent half a week doodling on a whiteboard to design and implement a modular RBAC hierarchy, and our team has long since confirmed that it works smoothly. What have I done to topple my house of cards?

A few quick laps through Snowflake’s query history revealed the culprit. Big surprise: it was me. I had been testing a new Dagster job, and in that process I started it, then canceled it. If you’ve landed on this page and read this far, you probably know why that caused me problems. Dagster terminated the job, preventing dbt from executing any further commands. But it did not cancel my currently running Snowflake query, and that ultimately caused my permissions error. To prevent this, I needed a way to ensure that next time a Dagster job is terminated, my Snowflake queries also get canceled.

If you already understand all the jargon I just flung at you, feel free to scroll down to the epic conclusion of this story. But if you don’t — or if you enjoy the slow, dry build of data engineering suspense — then let me regale you with a tale of configuration frustration.

And by the way, if you use BigQuery, Databricks, or Redshift, this still applies to you! The dbt features I’ll discuss below (grant post-hooks) are possible (and equally breakable) in those platforms as well.

The Setup: dbt and Snowflake Security Madness

Modular Role-Based Access Control (RBAC) with Functional and Access Roles

In Snowflake, the role that owns a table is the only role that can drop the table. The most common way dbt refreshes our tables is by dropping and recreating them, so it is essential that our automated ETL process (Dagster) can drop all our tables.

Sometimes, we humans also create and/or refresh tables with dbt. Other times, you may have multiple automated services refreshing tables. In a simple Snowflake role setup, you might end up with this scenario:

The simplest RBAC model, where the role that created the table owns the table, can lead you to trouble!
If your table ownership looks like this, you’re gonna have a bad time.

The role that created the table, if you take no further steps, owns the table. This is very quickly problematic, though. What happens when the daily ETL job, run by Dagster, tries to refresh Table B? It fails.

This basic RBAC model does not allow for other roles to refresh a given table.
See? I tried to warn ya.

To solve this, we create a role hierarchy, where access roles (ARs) control the ability to perform an action on an object, like creating a table in a certain schema, and users are granted functional roles (FRs), which inherit these access roles.

What? Here, this should clear it up:

A hierarchy of functional roles and access roles allows for shared ownership of a table.
Ok yeah, that’s pretty straightforward actually.

This small hierarchy of functional roles and access roles allows for both Dagster and Maggie to own, and therefore drop/recreate, a table. Now all your users, automated and human, can refresh your tables:

When tables are owned by a common access role, multiple functional roles can effectively own a table.
And they lived happily ever after.

Managing post-refresh grants with dbt

The good folks down at dbt Labs fortunately support this exact scenario. You can use the grants parameter to specify a grant command that runs after your dbt model is refreshed. This means that, as soon as your table is refreshed, it immediately is transferred over from the refreshing functional role (e.g. FR_DAGSTER_ROLE) to the shared access role (AR_PROD_TABLE).

These happen as two separate SQL commands. First, the one to create/replace your table, and second, the grant. That will be important later!

Shortcut solution: pre-refresh role swap

Hold up, before you keep reading, I might be able to save you some time! Below I’ll explain the intricacies of Dagster and Kubernetes, why Dagster isn’t canceling your database queries, and how to force it to cancel them.

But if you’re specifically worried about this permissions issue, there is an easier solution. Instead of running a grant after the table is created, use a pre-hook to run a USE ROLE command to assume the AR before replacing the table. That way, your table is owned by the AR as soon as the refresh completes, and no further command is needed. If Dagster shuts down dbt, no problem: the table will continue to refresh in your database after the Dagster job is terminated, and when it’s done, it will be owned by the correct role. Pretty clever, right?

I still implemented (and recommend) Dagster-dbt interruption for other reasons, such as preventing table deadlocks. But if you came here specifically for the permissions issue, you can walk away right now and have your problem solved!

The Problem: Dagster terminates Kubernetes pods with SIGTERM

Dagster and Kubernetes pods

Our team has configured Dagster to run on Kubernetes, hosted in AWS EKS. When Dagster executes our jobs, there is a hierarchy of Kubernetes pods that come into play:

  • The agent pod is always running, waiting for your command to execute a job. When you begin a job, you create:
  • A pod for this specific run of your Dagster job, named something like dagster-run-abc123. This is maintaining the logs of results of each op and the progress of the job run. On that note, you also will see:
  • A pod for each op in your job, named something like dagster-step-xyz987. These pods execute the code specified in your ops.

When you terminate a job in Dagster, the dagster-run pod is responsible for first shutting down the currently running dagster-step pods, then sending the results back to Dagster’s backend, and finally shutting itself down.

Termination Signals 2: Judgement Day

When a dagster-step pod is shut down, it receives a SIGTERM, a particular flavor of POSIX termination signal. This is a very common termination signal. It’s not as aggressive and final as SIGKILL, which is what you might send to an unresponsive program. SIGTERM is more like a firm yet polite ask.

dbt, responds to SIGTERM by — you guessed it — terminating everything it’s doing. Remember though that dbt itself does not run your SQL queries. It issues SQL commands to your database and then waits for a response. So when you terminate dbt, you are not also terminating the SQL command.

Why does it respond (or not respond) to SIGTERM this way? Among many reasons, dbt was designed as a command-line tool, and the typical way that people terminate command-line programs is by pressing Ctrl+C. When you press Ctrl+C in your terminal, you are issuing a SIGINT, not a SIGTERM. When you press Ctrl+C/issue SIGINT to dbt, dbt handles your termination by running one final action before stopping: it tells your database to cancel the queries it was running. Have a look at these timelines to see the different behaviors for each signal:

dbt does not terminate a running database SQL query when it gets SIGTERM.
When dbt receives SIGTERM, it shuts down without stopping the SQL command.
dbt terminates a running database SQL query when it gets SIGINT.
When dbt receives SIGINT, it first cancels the SQL command before shutting down.

Access [not] Granted

This brings us back to the original problem. When you terminate a Dagster job, it sends SIGTERM to the Kubernetes pod that is running dbt. Because the pod receives SIGTERM and not SIGINT, it does not cancel the running Snowflake query. You’ve got a 99.99% chance that, at any given moment, the running Snowflake query is the table refresh, not the grant command. That means your refresh is left to complete, but the pod with dbt is not around when it’s time to issue the grant command.

This is how I got the error: the SALES table was owned by FR_DAGSTER, because nobody ever transferred the ownership over to AR_PROD_TABLE. The error I got was from a different process (our old, soon-to-be-deprecated ETL job), which was not using FR_DAGSTER. It therefore was unable to refresh the SALES table.

The Solution: preStop hooks for every Kubernetes pod that executes a dbt op

Kubernetes has just the feature for this! Container hooks can run immediately after a pod is started up (postStart) and immediately before a pod is shut down (preStop). Kubernetes provides thorough documentation on how these hooks interact with different termination signals, but the point we need to know is that a SIGTERM will trigger the preStop container hook before the pod is shut down.

“Great, how do I do that?”

Sending SIGINT to dbt

When the preStop hook runs, it will be another process outside dbt. So you’ll need a way to find dbt before you can terminate it. I wrote up these quick bash commands to do exactly that:

export DBTPID=`ps -ef | grep -v 'grep' | grep 'dbt' | grep 'run' | awk '{print $2}'`
kill -SIGINT $DBTPID

ps -ef lists the running processes, the grep commands narrow down what I want, and awk returns just the process id, or pid. Then it gets passed to a kill command. I packaged these up into a single script, which I called interrupt-dbt.sh. Easy peasy, right?

A tip for anyone else tinkering in this area: the logs for the container hooks do not appear in the dagster-step pod, dagster-run pod, or the Dagit UI. Nope, you have to run kubectl get events | grep FailedPreStopHook to see the logs of a failed run. It was using this command that I discovered my first roadblock: the Docker image used in the dagster-step pods actually did not come with ps installed! That was easy enough to fix, though. You can add this line to your Dockerfile to install it:

RUN apt-get update && apt-get install -y procps

Next step: get my Kubernetes pod to execute my script.

Configuring a Kubernetes container hook in Dagster

Shoutout to the Dagster team, who gave us a path to pass Kubernetes configurations into the pods for both our ops and our jobs! Using the tags argument, you can specify virtually any Kubernetes configuration option you need.

I know what you’re thinking, and yes, this is the massive rabbit hole in which you can lose yourself seeking the ultimate, optimal Kubernetes configuration.

We need the preStop hook to operate on our dagster-step pods, which execute the ops. Therefore, this Kubernetes configuration is passed into the @op decorator. For example:

@op(
required_resource_keys = {'dbt'},
tags = {
'dagster-k8s/config': {
'container_config': {
'lifecycle': {
'preStop': {
'exec': {
'command': ["/bin/bash","-c","bash interrupt-dbt.sh"]
},
},
},
},
},
}
)
def my_cool_op(context):
context.resources.dbt.run(models=['my_cool_dbt_model'])

I could have stopped here, as this solution will do the trick. Try it yourself! You can start a job, watch the queries start in Snowflake, terminate the job, and watch the queries get canceled. Like magic.

But what kind of data engineer would I be if I was satisfied here? I need this logic all over my Dagster repo, anywhere I run dbt.

Sneaking the container hooks into all your Dagster ops

A quick splash of Python witchcraft does the trick. You can build a function that returns a default op config, and then pass that function into @op decorators. Here is my Dagster repo directory:

dagster_repo
├── utils
│ ├── __init__.py
│ └── job_op_config.py
├── my_cool_job
│ ├── __init__.py
│ └── my_cool_job.py
└── repository.py

The file job_op_config.py is where I wrote this function. At first, I had something like this as my function:

def get_op_config():
return {
'required_resource_keys': {'dbt'},
'tags' : {
'dagster-k8s/config': {
'container_config': {
'lifecycle': {
'preStop': {
'exec': {
'command': ["/bin/bash","-c","bash interrupt-dbt.sh"]
},
},
},
},
},
}
}

In your op decorator, you can use the splat operator (**) to parse the dictionary out into a list of keywords and arguments:

from utils.job_op_config import get_op_config

@op(**get_op_config())
def my_cool_op(context):
context.resources.dbt.run(models=['my_cool_dbt_model'])

Neat! But this setup is limiting if you need to customize your op config further. I played with a few options before landing on my final solution - initializing the dictionary returned by get_op_config and modifying it, adding all possible arguments to get_op_config… But in the end, I landed here:

def merge_dict(dict1, dict2):
'''Merges two dictionaries, even if they have nested sub-dictionaries.'''
for key in dict2.keys():
if key in dict1.keys() and isinstance(dict1[key], dict) and isinstance(dict2[key], dict):
merge_dict(dict1[key], dict2[key])
elif key in dict1.keys() and isinstance(dict1[key], set) and isinstance(dict2[key], set):
dict1[key].update(dict2[key])
else:
dict1[key] = dict2[key]

def get_op_config(**kwargs) -> dict:
base_op_config = {
'required_resource_keys': {'dbt'},
'tags' : {
'dagster-k8s/config': {
'container_config': {
'lifecycle': {
'preStop': {
'exec': {
'command': ["/bin/bash","-c","bash interrupt-dbt.sh"]
},
},
},
},
},
}
}

# Append any other items that were specified. Using merge_dict() allows for nested dictionaries to be merged.
merge_dict(base_op_config, kwargs)

return base_op_config

merge_dict merges whatever you pass in with base_op_config. If keys don’t overlap, they are added, regardless of their depth in sub-dictionaries. If two keys do overlap, the value you passed in replaces the default. Sets also get merged together.

For example, suppose I wanted to add other resources in. I made dbt a required resource by default, but I have other ops that issue queries directly to Snowflake. I can add Snowflake to the list of required resources like this:

@op(**get_op_config(required_resource_keys={'snowflake'}))
def my_cool_op(context):
context.resources.dbt.run(models=['my_cool_dbt_model'])
context.resources.snowflake.execute_query('SELECT null as null_col')

Behind the scenes, the set {'snowflake'} got merged with the set {'dbt'} to make {'snowflake','dbt'}. Similarly, passing this dictionary into your op:

@op(**get_op_config(tags={'foo': 'bar'}))
def my_cool_op(context):
context.resources.dbt.run(models=['my_cool_dbt_model'])

Will give you these tags:

'tags' : {
'dagster-k8s/config': {
'container_config': {
'lifecycle': {
'preStop': {
'exec': {
'command': ["/bin/bash","-c","bash interrupt-dbt.sh"]
},
},
},
},
},
'foo': 'bar',
}

The Result: graceful shutdowns and — bonus! — easy op config

Since implementing this, we have not had another mysterious permissions issue in Snowflake. The team is also very pleased with the super slick get_op_config function. I built a similar function for job config, as my goal as a data engineer is to make our data team’s jobs as easy as possible.

I hope you learned something from this post! I know I learned a ton about termination signals and Kubernetes. I also spent many hours banging my head against the differences in kill between bash/zsh/sh and the numerous way to interpolate strings in bash, so I hope I’ve saved you from going through the same trouble. If you found this useful, found a mistake, or found a way to build upon this, I’d love to hear about it!

--

--

Brian Pohl

Seattle-area data geek, with a heavy background in data modeling and SQL. Obsessed with automation and efficiency, to what some call a mildly concerning degree.