Building dag-workflows: a Python Workflow Execution Tool

Alec Clowes
Roivant Technology
Published in
7 min readAug 23, 2021

At Roivant, we use technology to ingest and analyze large datasets to support our mission of bringing innovative therapies to patients.

In this post, we’ll walk through the decision-making process that led to building our own workflow orchestration tool. We’ll talk about our needs and goals, the current product landscape, and the Python package we decided to build and open source.

We hope you’ll enjoy the discussion and find something useful in both our approach and the tool itself.

Generic looking server racks
Someone needs to tell these computers what to work on. Photo by Manuel Geissinger from Pexels.

Our needs

We started our journey by looking at our past experiences and reading up on new projects. Since the mid-2010s, tools like Apache Airflow and Spark have completely changed data processing, enabling teams to operate at a new scale using open-source software. While these tools were a huge improvement, teams now want workflow tools that are self-service, freeing up engineers for more valuable work.

An article from Google engineer Adler Santos on Datasets for Google Cloud is a great example of one approach we considered: use Cloud Composer to abstract the administration of Airflow and use templating to provide guardrails in the configuration of directed acyclic graphs (DAGs). The deep analysis of features by Ian McGraw in Picking a Kubernetes Executor is a good template for reviewing requirements and making a decision based on how well they are met.

We compiled our desired features for data processing:

  • Python compatible: We use Python because data science libraries like Pandas and web frameworks like Django are easy to use and well documented.
  • Asynchronous tasks: A tool for executing longer-running code, which is to say almost anything that cannot be completed during an API response.
  • Triggered on-demand or on a schedule: Tasks should be triggered with API calls, by external events, or on a cron-like schedule, and we need a way to check on execution status.
  • Support for inter-task dependencies: The ability to run tasks in parallel in a workflow makes the overall workflow faster and localizes failures.
  • Low overhead: Tools that are easy to set up and learn, keep developers happy, and reduce the support burden.
  • Kubernetes native: Because we deploy our applications on Kubernetes, it is convenient to have a tool that is designed for container-based orchestration.
  • Decentralized: Defining workflows and writing orchestration code alongside the API services, data models, and scientific computing code to make dependency management, shared data models, and change management easier.
  • Easy to test: Writing tests for common use cases should be easy, such as running SQL against a database.

Solutions considered

We reviewed existing tools looking for something that would meet our needs. Here is a summary of our research:

  • Cron is the old standby, but it does not parallelize your workflow or run across multiple servers. It’s nicely agnostic on testing, monitoring, and pretty much everything else.
  • Celery is an established Python message-based task execution framework. It is has grown to support scheduled tasks and workflows, but the queue-based core means looking up task status and building workflows is unnecessarily difficult.
  • Airflow is the market leader, popularizing the concept of DAGs in the Python community. It is a heavyweight and centralized solution, however; most developers don’t want to run it locally. It also is not Kubernetes-native: Cloud Composer abstracts the administration of Airflow but uses a legacy Celery worker to run a Kubernetes Pod. The flexibility afforded by the Python DAG definition makes enforcing common patterns difficult.
  • Kubernetes Job/CronJob is easy to define and a good starting point but doesn’t handle workflows. It is agnostic to what gets run, so testing is not addressed.
  • Dagster addresses many of the same testing and orchestration issues, but it mixes the orchestration metadata into the functional code, which means there is more to learn.
  • dbt is focused on advanced SQL templating, and for orchestration, they recommend their proprietary cloud or Airflow. It’s a lot to learn for a partial solution.
  • Meltano glues together Singer and dbt. Again, it uses Airflow for orchestration.
  • Argo Workflows implements workflows within Kubernetes, which is very promising. However, it lacks data-centric testing support and a simple local runner for development.
  • Serverless approaches like AWS Lambda + Glue are popular but hard to simulate locally. They also split the codebase awkwardly along the needs of each component.

Our decision

While there were many options available, none of them seemed quite right for us. Most tools were either too complicated or lacked clean Kubernetes integration. Airflow, for instance, has both shortcomings. Dagster has native Kubernetes support but a steep learning curve. It’s also opinionated about passing data and defining workflows in code, which is in conflict with our desired simplicity.

At this point, we decided to build our own lightweight wrapper for running workflows. Our vision was a tool that runs locally during development and deploys easily onto Kubernetes, with data-centric features for testing and validation. We determined there would be three main components to design: the workflow definition, the task execution, and the testing support.

We’ll introduce each of these elements in the next section in a short tutorial on using the tool we named “workflows”.

A brief introduction

The command line and module are workflows but the package is installed as dag-workflows like this:

pip install dag-workflows

Defining DAGs and Tasks

There are two predominant patterns for defining tasks and grouping them into a DAG. Tools like Airflow, Celery, and Dagster, define the DAG using Python code. Tools like Kubernetes and dbt use YAML. We like YAML because it is more readable and helps enforce a single way of doing things, making the configuration options clearer and easier to manage across teams. The proliferation of tools like Gusty that turn YAML into Airflow DAGs suggests many see a similar advantage. Most peculiar is the way Google’s Public Datasets Pipelines uses Jinga to generate the Python code from YAML.

We follow the pattern of grouping individual tasks into a DAG by representing each task as a file in a folder representing the DAG. Like Gusty and other tools, we put the YAML configuration in a comment at the top of each file. The individual task files can be.sql, .py, or .yaml files. A SQL task looks like this:

/*
description: Make a table
type: sql
inputs: []
*/
CREATE TABLE IF NOT EXISTS example AS
SELECT 'test' AS name, 1 AS value;

And a Python task should have a run method that looks like this:

"""
description: Copy a file
type: python
inputs:
- other_task.sql
"""
import shutil
def run(context):
shutil.copy('file_A', 'file_B')

You’ll notice that the YAML has a field called inputs; this is where you list the tasks which are predecessors and should run first.

Running the DAG

After writing your tasks, the next step is to run them. We designed workflows to support multiple execution models, two of which handle scheduling and parallelization:

  • The Kubernetes scheduler runs each task as a Kubernetes Pod. It triggers DAGs on a cron-like schedule and runs tasks as their dependencies are satisfied. It uses a PostgreSQL database to keep track of DAG and task runs.
  • The worker scheduler uses the same scheduling logic but runs tasks as a subprocess. Multiple workers can be run to parallelize execution.
  • For local development, the command line workflows run interface is used to run tasks locally. The same interface is used by the schedulers when running a single task.
  • Third-party executors such as good ol’ cron and Airflow can use CLI runner as well. In Python, they can introspect the DAG object and call the execute method on tasks directly.
Diagram of the scheduler and runner components of workflows.

To run the local executor, use the command line. The first argument is a configuration file which, at minimum, tells workflows what folder to look in for DAGs:

workflows dags.config run dag_name

To run the worker or Kubernetes schedulers, you need to provide a cron-like schedule for each DAGs in a YAML file, along with executor specific configurations like this:

description: Example DAG
schedule: '5 */4 * * *' # every four hours at five after
worker_config:
kube_image: "my_image:latest"

The scheduler requires access to a PostgreSQL database and is run from the command line like this. The scheduler type to use is specified in the last argument:

export DATABASE_URL=postgres://localhost/workflows
workflows dags.config schedule kubernetes

Writing Tests

An important requirement for us was easy testing of tasks. To support testing, we built a pytest fixture that supports running a task or DAG, and handles test database setup and teardown in the special case of SQL tasks. Our fixture utilizes pytest-django to create the database, and while you can choose to use Django with workflows, it is not required.

This example test covers a SQL task. It gets the task, sets up the input tables with test data, and executes the task. It asserts that the output matches the expected values:

from example_project import config
def test_calculate_trades(workflows):
task = config.DAGS["trader_bot"].get_task(
"calculate_trades.sql")
initial_data = {
"stock_prices": [
{"date": "2020-01-01", "close": 34},
{"date": "2020-01-02", "close": 43}
],
"limits": [{"max_bid": 100}],
}
workflows.setup_task_inputs(task, initial_data)
results = workflows.execute_task(task) assert len(results["new_trades"]) == 2
assert results["new_trades"][0]["limit_price"] == 134

Learn More

Thanks for taking the time to read about workflows! Here’s some suggested reading that might be of interest.

--

--