Designing Repeatable DAGs in Airflow (Part 2)

tim.ellissmith
Qodea Google Cloud Tech Blog
6 min readJun 27, 2022
Credit: Vecteezy.com

In the last blog we looked at using Dataclasses to configure DAGs.

In this blog, we going to go into a deeper dive in designing the code for custom operators.

We’re going to look at two different ingestion types:

  • Ingesting from a file
  • Running DBT models

While there are potentially an unlimited number of possible DAG designs the purpose of this blog is to rather highlight the principles of keeping your DAG file clean of logic (I hate the python operator particularly when the method it calls is included in the main DAG file) and ensuring repeatability.

Ingesting data from files

I’ve created a generic ingestor as follows:

class Ingestor(BaseOperator, metaclass=abc.ABCMeta): # 1
"""
ABC for ingesting files.
The _process_data function needs to be defined by an inherited class.
"""
ui_color = "#A1BBFF"
template_fields = ["filename"] # 2
def __init__(
self,
destination_project_dataset_table: str,
project_id: str,
bucket: str,
filename: str,
bucket_prefix: str,
gcp_conn_id: str = "google_cloud_default",
partition_column: str = "",
**kwargs
):
"""Init function."""
super().__init__(**kwargs)
self.destination_project_dataset_table = destination_project_dataset_table
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
self.gcs_prefix = bucket_prefix
self.bucket = bucket
self.partition_column = partition_column
self.conn = BigQueryHook(gcp_conn_id=self.gcp_conn_id)
self.filename = filename
@classmethod # 3
@abc.abstractmethod # 4
def _process_data(cls, data: bytes, publisher_time: datetime, file_name: str) -> pd.DataFrame:
"""Transform the data for BigQuery."""
raise NotImplementedError
def write_result(self, dataframe: pd.DataFrame): #5
"""Write the result into BigQuery.
If the table doesn't exist, create it append to existing data Args:
dataframe (pd.DataFrame): The data to be loaded
"""
logging.info(f"Writing result to {self.destination_project_dataset_table}")
time_partitioning = TimePartitioning(field=self.partition_column) if self.partition_column else None job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND", # 6
create_disposition="CREATE_IF_NEEDED",
time_partitioning=time_partitioning
)
bq_client = bigquery.Client(
project=self.conn._get_field("project"),
credentials=self.conn._get_credentials()
)
job = bq_client.load_table_from_dataframe( # 7
dataframe,
self.destination_project_dataset_table,
job_config=job_config
)
job.result()
logging.info(f"Wrote result to {self.destination_project_dataset_table}")
def execute(self, context): # 8
"""Entrypoint to execute DAG."""
contents = self.gcs_hook.download(self.filename, self.bucket)
if isinstance(contents, bytes):
file_contents: bytes = contents
else:
file_contents = contents.encode()
date_time = datetime.now()
just_file = self.filename.split("/")[-1]
df = self._process_data(file_contents, date_time, file_name=just_file)
self.write_result(df)

Note the following:

  1. I’ve created a generic class Ingestor which inherits from the airflow.models.BaseOperator class and the abc.ABCMetadata classes. The BaseOperator ensures that this an airflow operator while the abc.ABCMedata class ensure that this is an abstract base class.
  2. By specifying filename as a template field I allow the filename to be read as a jinja variable
  3. By specifying the _process method as a class method I make it easier to unit test as I don’t need to instantiate the class (which will bring in a whole lot of dependencies)
  4. The _process_data method is an abstract method which will be defined in sub-classes
  5. I define all the other common classes. Note also the use of pandas.dataframes to pass data between classes — I find this the easiest way to pass data.
  6. At the moment these parameters are hard-coded but it would be easy to abstract them out back to the dataclass for increased configurability
  7. I opt to load from dataframe as this allows me to call the BigQuery batch api rather than the streaming insert api. This has a number of advantages including cost and they way BigQuery writes to the table.
  8. The execute function is the function that is called first when the operator is called

Having this in place allows us to create more specific implementations according to the transformations needing to be done. Take a look at this implementation of the class:

class IngestSalesFiles(Ingestor):
"""Ingestor for the Sales Files.
These are files loaded from excel format.
"""
@classmethod
def _process_data(cls, data: bytes, publisher_time: datetime, file_name: str) -> pd.DataFrame:
"""Transform the excel file into form loadable into BigQuery.
Args:
data (bytes): The contents of the file
publisher_time (datetime): The time the file was ingested
file_name (str): The name of the file
Returns:
pd.DataFrame: The pandas dataframe to be loaded
"""
excel_file = io.BytesIO(data)
df = pd.read_excel(excel_file, header=1)
# Transforms can go here:
...
return df

So now we have a simple implementation of the transforms we need to do in pandas.

The next question is how do we call this function? We shouldn’t need a separate dag file with different operator names for each ingestion type and if ... else ... statements are cumbersome.

Here functional programming comes to the rescue. Python has the partial method in the functools library which allows you do to the following:

from functools import partial...        ingest_files = partial(dag_dataclass.ingestion_class,
task_id="Ingest files",
filename="{{ dag_run.conf['name'] }}",
bucket=dag_dataclass.source_bucket,
destination_project_dataset_table=dag_dataclass.destination_project_dataset_table,
project_id=dag_dataclass.project,
bucket_prefix=dag_dataclass.bucket_prefix,
gcp_conn_id=dag_dataclass.gcp_connection
)()

By using partial, we can define the class we are calling in our configuration file:

def create_dag_definitions(project: str) -> List[StorageToBigQuery]:
"""Return a list of configurations of DAGs to be created for file ingestion."""
return [
StorageToBigQuery(
...
ingestion_class="IngestForecastFiles",
...

So now, to create a new ingestion method, you need to:

  1. Create the configuration in the config file
  2. Create a new ingestion class with the _process_data method defined

And it’s as simple as that. Your main DAG files remain repeatable and clean.

In the next section we are going to look at a calling DBT from within Airflow.

Calling DBT

We can call DBT with the following code:

command = f"dbt run --profiles-dir profiles --profile {dag_dataclass.profile} -m {dag_dataclass.model} -t composer --project-dir ./dbt".split() # 1    with DAG(
dag_id=dag_dataclass.dag_id,
description=dag_dataclass.description,
schedule_interval=dag_dataclass.schedule,
default_args=args,
catchup=dag_dataclass.catchup,
tags=dag_dataclass.tags,
max_active_runs=dag_dataclass.max_active_runs,
) as dag:
run_dbt = KubernetesPodOperator( # 2
task_id="load_sales",
name="load_sales",
namespace="default",
# The short sha will be substituted by Cloud Build
image=dag_dataclass.image + ":${SHORT_SHA}", # 3
cmds=command,
secrets=[secret_volume], # 4
is_delete_operator_pod=True # 5
)

Note the following:

  1. the command allows flexibility when choosing the profile and the model to be run.
  2. we call the KubernetesPodOperator for this (explained below)
  3. the ci/cd process will substitute the sha in of the latest image to ensure we always have up-to-date code
  4. for GCP using the KubernetesPodOperator we need to mount the service account file as a key
  5. We clean up the pod once it has run otherwise kubernetes will be polluted with a lot of completed pods.

The declaration of the kubernetes secret is as follows:

secret_volume = Secret( # pragma: allowlist secret
deploy_type="volume",
deploy_target="/var/secrets/google",
secret=dag_dataclass.kubernetes_service_account, # pragma: allowlist secret
key="service-account"
)

To KubernetesPodOperator or not to KubernetesPodOperator?

I have seen arguments that say you should always use the KubernetesPodOperator, however I disagree with this for the following reasons:

  • There are a number of really good off the shelf operators that do exactly what you may want to do already
  • It is more work to create and configure the Dockerfile
  • When running a managed instance like Cloud Composer you can be assured that the Google Operators will all work as expected

I’m most likely to choose to use the Operator when there are custom python library requirements. I have found myself in dependency hell between Airflow’s requirements and the requirements of the libraries that I am installing. In general therefore, whenever I’m doing something that’s not pretty much off the shelf, I will use the KubernetesPodOperator.

Wrapping up

That’s about it. I hope you learned a lot from these blogs. In future I will look to include a blog about CI / CD practices for Airflow.

About CTS

CTS is the largest dedicated Google Cloud practice in Europe and one of the world’s leading Google Cloud experts, winning 2020 Google Partner of the Year Awards for both Workspace and GCP.

We offer a unique full stack Google Cloud solution for businesses, encompassing cloud migration and infrastructure modernisation. Our data practice focuses on analysis and visualisation, providing industry specific solutions for; Retail, Financial Services, Media and Entertainment.

We’re building talented teams ready to change the world using Google technologies. So if you’re passionate, curious and keen to get stuck in — take a look at our Careers Page and join us for the ride!

--

--