Designing repeatable DAGS in Airflow (Part 1)

tim.ellissmith
Appsbroker CTS Google Cloud Tech Blog
6 min readJul 6, 2022
Credit Vecteezy.com

This is the first of two blog posts where I talk about the process of designing repeatable DAGs in Airflow. This blog will cover the following:

  • Using python Data Classes for configuration
  • Designing the DAG structure for repeatability
  • Allowing for flexibility in DAG design

The next blog will cover:

  • Configuring DBT to run from Airflow
  • Creating repeatable classes for file ingestion
  • Configuring Cloud Functions to trigger DAGS dynamically based on names

Using Python Dataclasses for configuration

Note: This Blog was inspired by a talk at the Airflow Summit by Madison Swain-Bowden

Why use Dataclasses?

Python Dataclasses are an easy way to create classes that exist to contain only data, and methods to manipulate that data. They automatically generate methods such as __init__ reducing the amount of boiler plate code.

By setting frozen=True on the dataclass we can also ensure that it is immutable.

Personally I find referencing variables in a dataclass much easier than referencing a variable in a dict (e.g. loaded from a json or yaml file). Compare the following two variable definitions:

# Declare the variable from a dict loaded from a json file
dag_name = configuration["dag_name"]
# Declare the variable from a dataclass
dag_name = configuration.dag_name

In the former case, apart from being harder to type, Python has no knowledge of the contents of the key, and therefore can’t infer type and can’t even infer existence before run time (as the file is loaded in from an external source). A dataclass however, allows you to explicitly set type which combined with the correct plug-in in your IDE will notify you of any type errors.

In the case with a dataclass then existence of the variable as well as its type is known before run-time so any errors will show up in your IDE before running the DAG, greatly reducing debugging time (auto-suggestion by the IDE also really works well in this situation)

Designing dataclasses around DAGs

Let’s take a look at a simple DAG dataclass file:

"""Define the dataclasses to form the framework for creating dags."""from dataclasses import dataclass, field
from typing import Union
from airflow.utils import dates
@dataclass(frozen=True)
class airflow_dags():
"""This dataclass contains all generic airflow variables."""
description: str
dag_id: str
owner: str
project: str
doc_md: str
bigquery_location: str = "US"
schedule: Union[str, None] = None
gcp_connection: str = "google_cloud_default"
max_active_runs: int = 1
retries: int = 1
provide_context: bool = True
tags: list = field(default_factory=list)
catchup: bool = False
email: list = field(default_factory=list)
email_on_failure: bool = False
start_date: dates = dates.days_ago(1)

Here I have defined the variables that I want to reference in my Airflow dags. At this stage these variables are pretty generic, and I will create more DAG specific ones for each DAG type (DAGS that are similar in design) that I want to include.

As you can see each variable is clearly defined with its type. It’s also easier to document your variables in your docstring if needed.

Another plus is we can add sane defaults for all DAGS at this point e.g. max_active_runs or provide_context.

From here we can define more DAG specific classes which inherit from the above class.

@dataclass(frozen=True)
class StorageToBigQuery(airflow_dags): # 1
"""
Dataclass for importing storage data to bigquery.
This should have all the parameters needed for an import of storage data to bigquery.
"""
source_bucket: str = "" # 2
success_bucket: str = ""
dataset: str = ""
table: str = ""
bucket_prefix: str = ""
ingestion_class: str = ""
destination_project_dataset_table: str = field(init=False)
create_disposition: str = "CREATE_IF_NEEDED"
write_disposition: str = "WRITE_TRUNCATE"
def __post_init__(self): # 3
"""Run post init tasks."""
env = self.project.split("-")[1]
self.destination_project_dataset_table = f"{self.project}.{self.dataset}.{self.table}"
self.source_bucket = self.project + f"{env}_input"
self.success_bucket = self.project + f"{env}_archive"

Note the following:

  1. The class inherits from the base class defined above, and therefore includes all its variables
  2. All variables are given default values even if they shouldn’t have one. This is unfortunately a limitation of class inheritance in Python, and while there are ways around it (i.e. by causing an exception in the post-init if they are empty) they won’t give you the dynamic feedback in your IDE.
  3. The __post_init__ section runs immediately after the class has been instantiated. This allows you to do clever things like define composite variables.

At this stage we have the dataclasses defined but now we need a way to instantiate the dataclasses.

We can instantiate the classes as part of a list (we will see why later) as follows:

"""Configurations for dags that run DBT.
This file is used for the configurations that are fed to the dag file to create the dags.
"""
from typing import Listfrom .dag_dataclasses import DBTModels
def create_dag_definitions(project: str) -> List[DBTModels]:
"""
Create the DAG definitions.
Args:
project: the name of the GCP project
Returns: List[DBTModels]
"""
return [
DBTModels(
description="Run the sales data ingestion process",
dag_id="sales",
owner="Tim",
project=project,
post_updates_sql_file="sql/sales_updates.sql",
model="sales",
schedule="@daily",
doc_md="""
# Load the sales ingestion process into BigQuery
This function will load the sales data into bigquery by:
* Calling DBT to insert data into bigquery
* Running Update statements from within airflow
"""
)
]

Here we have a list with only one element but it can now be easily expanded to include similar DAGs. You should be able to see how this maps to the definitions above.

We now need to see how we are going to use the above list to define the DAGs themselves.

Scaling out multiple DAGS from dataclasses

Now that we have our configuration it’s time to look at the DAG design itself:

"""Create the sales dags."""...def create_dag(dag_dataclass: DBTModels) -> DAG: # 3
"""
Create a dag based on the configuration provided.
Args:
dag_dataclass: the dataclass with the configuration details
Returns: DAG
"""
args = {
'owner': dag_dataclass.owner,
'start_date': dag_dataclass.start_date,
'provide_context': dag_dataclass.provide_context,
"retries": dag_dataclass.retries
}
with DAG(
dag_id=dag_dataclass.dag_id,
description=dag_dataclass.description,
schedule_interval=dag_dataclass.schedule,
default_args=args,
catchup=dag_dataclass.catchup,
tags=dag_dataclass.tags,
max_active_runs=dag_dataclass.max_active_runs,
) as dag:
...
return dag # 4
dags = create_dag_definitions(project=project) # 1
for dag in dags: # 2
globals()[dag.dag_id] = create_dag(dag)

Okay, so here we have:

  1. We create our dag definitions by calling the function which will instantiate all the dag classes. The project parameter is fed in through the CI / CD process and we don’t cover that here.
  2. We iterate through the list
  3. and call the create dag function passing the instantiated class as a parameter to the function
  4. returning the dag which is passed back to airflow to create a DAG

We can define our DAG structure within the create_dag_definitions method.

Introducing flexibility to DAGS

This works nicely is all DAGS are exactly the same, but what happens if Joe from Finance wants add one step to one of the dags while all the others remain the same. Do we need a new file with new definitions?

Fortunately not, we can now use some simple if statements e.g.

if dag_dataclass.post_updates_sql_file:
post_updates = BigQueryExecuteQueryOperator(
task_id="post_updates_sql",
sql=dag_dataclass.post_updates_sql_file,
use_legacy_sql=False,
location=dag_dataclass.bigquery_location
)
post_updates.set_upstream(a_downstream_task)

So here we check whether the post_updates_sql_file is defined. If it is, then we run the task setting it downstream of the a_downstream_task task

This can be done by adding a line to the dataclass definition:

@dataclass(frozen=True)
class StorageToBigQuery(airflow_dags):
"""
Dataclass for importing storage data to bigquery.
This should have all the parameters needed for an import of storage data to bigquery.
"""
post_updates_sql_file = ""
...

This will allow us (within reason to re-use DAG files with small definitions)

Conclusion

In this blog we saw how to use dataclasses to define and create dags. We use a list of instantiated dataclasses to allow for the creation of multiple dags at the same time using one DAG definition file while allowing for customisation of the DAG at the same time.

In the next blog, we will look at how we use custom operators and dbt in DAGs with the aim of creating re-usable code and keeping the DAG definitions containing only configuration code.

About CTS

CTS is the largest dedicated Google Cloud practice in Europe and one of the world’s leading Google Cloud experts, winning 2020 Google Partner of the Year Awards for both Workspace and GCP.

We offer a unique full stack Google Cloud solution for businesses, encompassing cloud migration and infrastructure modernisation. Our data practice focuses on analysis and visualisation, providing industry specific solutions for; Retail, Financial Services, Media and Entertainment.

We’re building talented teams ready to change the world using Google technologies. So if you’re passionate, curious and keen to get stuck in — take a look at our Careers Page and join us for the ride!

--

--