How to Build Your Own Apache Airflow DAG Factory

Ayyoub Maulana
Data Engineering Indonesia
5 min readJul 9, 2023
Photo by Lalit Kumar on Unsplash

Introduction

Even though there is a DAG Factory library to simplify your code to build a DAG, maybe you want to try to build your own. I mean your simple DAG Factory to use in your mini project, portfolios, etc. This DAG Factory will utilize yaml file to define a code based on configuration, it will hide the complex logic behind it. So, actually, this is my boring project, let’s get into it.

Apache Airflow is used by many data engineers to build pipelines, orchestrate them then manage them at scale. So, here I try to compose from many sources to build our own DAG Factory so we can understand it better.

DAG Structure

Apache Airflow has a DAG structure that consists of these objects :

  • DAG
  • Operators
  • Task Dependencies

DAG has some mandatory parameters that need to define, including dag_id, default_args, and schedule_interval. The operator has task_id as a mandatory parameter and the others too but it depends on what operator we use, for example for BashOperator we need to define bash_command, for PythonOperator we need to define python_callable.

To define task dependencies we can use set_upstream to define the upstream task, it can be multiple upstream, just one, or none.

Building the Castle

Ok, so we have the requirements that can make us clear what needs to build. First thing first we need a project directory setup to make us easy to read and maintain. So, the project looks like this :

pipeline_project
|__ configs
| |__ etl.yaml
|__ dags
| |__ etl.py
|__ plugins
| |__ dag_factory.py
| |__ process_data.py
|__ data
| |__ jobs.csv
|__ output

We have pipeline_project as a root project directory, then under the root directory, there are two directories consisting of configs and dags folder. And we would like to create 2 files one is for the config file named etl.yaml and the other for the dag file named etl.py.

To make it easier to import we will create dag_factory.py file inside the plugins folder, we will utilize Apache Airflow behavior so that every Python file created under the plugins folder will be easy to import.

Create DAG Factory

Next, we will build the dag factory, we need to create a dag_factory.py file under the plugins folder. And we will write this code inside that file but for the sake of explanation, I will break it down into each function and then compose it together.

# Import some libraries
from airflow import DAG
from airflow.utils.module_loading import import_string
from datetime import datetime
from importlib import import_module
import yaml
def read_config(config_filepath):
return yaml.load(
stream=open(config_filepath, "r", encoding="utf-8"),
Loader=yaml.FullLoader,
)

That function will read the config file that is defined in config_filepath parameter and then return the dictionary to use in create_dag function that we want to create later on.

def import_modules(module_path):
module_name, class_name = module_path.rsplit(".", 1)
module = import_module(module_name)
callable = getattr(module, class_name)

return callable

The next function that we need to define is the import_modules function, it will import the respective module that we define in the config file. The import_operator function will be used in create_task function that we will create later on.

def create_task(task_config, task_mapper, dag):
task_id = task_config['task_id']
operator_path = task_config['operator']
params = task_config['params']
upstream_tasks = task_config.get('upstream', [])

operator_class = import_modules(operator_path)

callable = params.get('python_callable')
if callable:
params['python_callable'] = import_string(callable)

task = operator_class(
task_id=task_id,
dag=dag,
**params
)

for upstream_task_id in upstream_tasks:
upstream_task = task_mapper.get(upstream_task_id)
if upstream_task:
task.set_upstream(upstream_task)

return task

The create_task function will return a task instance, meaning the process inside it will cover task dependencies as well, the task will be created based on the operator that we use. And, we will use this function in create_dag below.

def create_dag(config_filepath, globals) -> DAG:
cfg = read_config(config_filepath)

dag_id = cfg['airflow']['dag_id']
schedule_interval = cfg['airflow']['schedule_interval']

year, month, date = list(map(int, cfg['airflow']['start_date'].split('-')))

default_args = {
"owner": cfg['airflow']['dag_id'],
"start_date": datetime(year, month, date)
}

dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval=schedule_interval,
)

task_mapper = {}

for task_config in cfg['tasks']:
task = create_task(task_config, task_mapper, dag)
task_mapper[task.task_id] = task

# To make Airflow discover our dag
globals[dag.dag_id] = dag

return dag

The main function that needs to run is create_dag function, which will create a DAG-type object along with other variables that are used in the DAG object, and create task dependencies. So we can compose those functions together into this code here.

You can modify it based on your creativity, encapsulate it in class, improve some lines, and many more.

Create ETL Config File

The next step after we have a dag factory library we can create an etl.yaml config file that consists of the configuration of the pipeline that we want to build. The goal is to extract csv data, lightweight transformation by filtering the employment type column, and then load it by saving it to json.

airflow:
dag_id: etl_pipeline
start_date: '2023-07-01'
schedule_interval: "@weekly"

tasks:
- task_id: extract
operator: airflow.operators.python.PythonOperator
params:
python_callable: process_data.extract_data
op_kwargs:
file: '/opt/airflow/data/jobs.csv'

- task_id: transform
operator: airflow.operators.python.PythonOperator
params:
python_callable: process_data.transform_data
op_kwargs:
filter: employment_type
value: 'FULL_TIME'
upstream: [extract]

- task_id: load
operator: airflow.operators.python.PythonOperator
params:
python_callable: process_data.load_data
op_kwargs:
output: '/opt/airflow/output/transformed.json'
upstream: [transform]

You can test your own, building complex configuration, and adjust it based on your understanding in this article. Maybe you need to adjust some parts to cover the complex config.

Create DAG File

So, we are almost in the final step, we will create a DAG file that processes an ETL pipeline. We will use this data in this process, just download it and save it in the data folder.

from dag_factory import create_dag
import os

cfg_dir = '/opt/airflow/configs'

for file in os.listdir(cfg_dir):
create_dag(os.path.join(cfg_dir, file), globals())

Don’t miss the module that will process the data, so we can create it under the plugins folder as well. You can name it process_data.py, fill it with the code below.

import json
import pandas as pd


def extract_data(file):
return pd.read_csv(file).to_dict('record')

def transform_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='extract')

return [value for value in data if value[kwargs['filter']] == kwargs['value']]

def load_data(output, **kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='transform')

f = open(output, 'w')
f.write(json.dumps(data))

How to

  1. Create Airflow docker-compose.yml
  2. Run it with the docker-compose up -d command
  3. Wait until everything is ready
  4. Open localhost:8080 in your favorite browser
  5. Run the pipeline
  6. To view the overall project visit my Github

Conclusion

Maybe this is a boring project, but I hope it will make us understand how to make an Airflow DAG based on the configuration file. Sometimes we face a lot of tasks that need to manage, and maybe the configuration file will come to the rescue. Happy pipelining!

--

--