Our Road to Abstracting Apache Airflow

Steve Whelan
JW Player Engineering
4 min readNov 14, 2023

I was working on a project a few months ago and I realized I needed to write a DAG for it. I thought to myself, hmm, this DAG is similar to this other DAG. So I copied that one, made a few edits and I was done. I thought, that’s great, work done in a matter of minutes. How efficient!

But later that day, a thought crept into my mind and lingered. I said to myself, “What did I actually just engineer? Had my day to day come down to an exercise in copying and pasting things?”

After a mild engineering existential crisis, I came across a great quote from Jeff Magnusson of Stitch Fix that really summed up the experience I just had.

“Engineers excel in a world of abstraction, generalization, and finding efficient solutions in places where they are needed.”

The JW Data team began our Airflow journey in 2018, full of ideas and enthusiasm. We built utility classes to store common configs for DAGs. We built generic operators that users could utilize with just a config file. But after a while we settled into that stable state where patches ruled the day.

There’s nothing more boring than doing the same thing over and over again. And that’s exactly what we had been doing with the DAGs we were writing. The job had gotten repetitive and our skills were being underutilized.

Engineers Shouldn’t Write ETL

In Jeff’s article, he makes another great point; “the job of a data engineer is not to write ETLs but to generalize and abstract the way those ETLs are made.”

This was really the original intent of Airflow at JW Player. We needed to get back to that goal. Instead of writing DAGs, we should be building frameworks for generalizing and abstracting the way DAGs are created.

We interviewed various engineering teams about their experience with our Airflow platform. They all stated they started a new DAG by first copying an existing DAG. It wasn’t a surprise, I was doing the same thing. As they described their interactions with Airflow, one common theme was coming through. Nothing was all that easy. It was time to change that.

DAG Templates

We analyzed all existing DAGs and some common workflows made up the majority. We decided to create a solution that would make creating those workflows significantly easier.

DAG templates are an alternative, simplified way for creating DAGs based on a common set of DAGs that we’ve created over the years. These templates were designed to be flexible while minimizing the boilerplate we’ve developed and duplicated throughout our DAGs.

dag_config:
dag_id: my_super_awesome_dag
default_args:
start_date: "2023-01-01"
execution_timeout: {hours: 24}
on_failure_callback: failed_callback
schedule: "@daily"
tags: ["my_dags"]
extra_args: [{terminate_cluster: True, cluster_name: my_emr_cluster, launch_flavor: biggest_emr_cluster}]
flavors: [crunch_the_numbers]
sensors: [upstream_dag]
type: emr-steps

There are 5 sections to the template:

  • dag_config — this section contains the configuration used when creating the DAG itself. The keys here correspond to the kwargs for the Airflow DAG object.
  • extra_args — this section contains a custom dictionary of inputs used in building your DAG.
  • flavorsFlavor is the internal term we use for the input configuration files to the generic operators we’ve created.
  • sensors — this is a list of DAGs/tasks that your DAG depends on.
  • type — this is the type of the template. For the initial launch, we focused on the 3 most common workflows:
    - Computing steps on AWS EMR clusters
    - Loading data into Postgres databases
    - Loading data into Snowflake

DAG Factory

Now that we have a DAG template, we need to turn it into an actual DAG within Airflow. We accomplished this by creating a factory for each `type`. I’ll focus on the SnowflakeLoader type as an example.

def generate_dag(template: Template) -> DAG:
with AirflowDAG(template=template, **template.dag_config) as dag:
sensors = TaskGroup(group_id="sensors", dag=dag, prefix_group_id=False)
generate_tasks(generate_external_sensor, template.sensors, task_group=sensors)
loaders = TaskGroup(group_id="loaders", dag=dag, prefix_group_id=False)
generate_tasks(generate_snowflake_loader, template.flavors, task_group=loaders)
sensors.set_downstream(loaders)
return dag

def generate_snowflake_loader(
flavor: str,
command: str,
dag: Optional[DAG] = None,
task_group: Optional[TaskGroup] = None,
) -> Operator:
return JWOperator(
dag=dag,
task_group=task_group,
task_id=sanitize_task_id(flavor, prefix="loader"),
description=f"[Snowflake Loader] Flavor: {flavor}",
image="my_snowflake_loader_image",
environment={ flavor=flavor},
command=command,
)

Utilizing the generate_dag function, Airflow can convert a DAG Template into an actual DAG. One downside to this was the Code section within the Airflow UI shows the DAG factory code to the user which isn’t very useful. So we added the DAG Template as a DAG note.

The result has been a huge reduction in lines of code for DAGs and an increase in the enthusiasm of our users to make use of the Airflow platform.

--

--