Photo by tian kuan on Unsplash

Airflow (MWAA)— Automating ETL for a Data Warehouse

Amit Singh Rathore
Nerd For Tech
Published in
7 min readJan 25, 2021

--

Airflow is an open-source workflow management platform. We define those workflows with DAGs/“Configuration as code” written in python.

Apache Airflow at a high level has the following components talking to each other.

from aws blog

To run a basic instance of Airflow we need to have an EC2/ECS cluster, an RDS instance, workers, schedulers and webservers, EFS for having DAGs available to all worker containers and etc. The below diagram is one such deployment.

As seen above, we have many moving components and we have to manage all of those ourselves. This becomes a pain as we keep on having complex setups and workflows. To ease the pain, AWS has launched a managed service — Amazon Managed Workflow for Apache Airflow. It is a fully-managed orchestration service for data pipeline which simplifies running open-source versions of Apache Airflow on AWS. The below image describes what we get with MWAA. Everything Worker, Scheduler, Queueing, DB, Webserver is in AWS managed VPC and is totally managed by AWS.

Notice in the above diagram in your account MWAA service will create some ENI( for subnet-security group pair). This is similar to the lambda networking (Lambda in VPC with Hyperplane ENI). In your AWS account, you will need to have an S3 bucket to store DAGs and Airflow supplementary artifacts (plugins).

Before we create MWAA and run anything let's get some refresher on Airflow concepts:

Webserver

Provides control interface for users and maintainers. Airflow UI is a Flask + Gunicorn setup. It lists DAGs, their run history, schedule, pause start options. This is a central place from where we can manage the airflow pipelines. It also handles the APIs. You can apply custom themes by setting APP_THEME = "[theme-name].css" (last line in webserver_config.py) and setting one of the themes listed here.

Metadata Database

Airflow uses a database that is supported by SQLAlchemy Library e.g. PostgreSQL or MySQL. This DB powers the UI as well as acts as the backend for the worker and scheduler. The metadata database stores configurations, such as variables and connections. It stores user information, roles, and policies. It also stores DAG-related metadata such as schedule intervals, tasks, statistics from various runs, etc.

Scheduler

A daemon, built using the python daemon library. Schedules & delegates tasks on worker node via executor. Also takes care of other housekeeping tasks like concurrency checks, dependency checks, callback, retries, etc. Three main components of the scheduler are:

  • SchedulerJob
  • DagFileProcessor
  • Executor

Worker

These are the workhorse of the airflow. They are the actual nodes where tasks are executed.

Executor

Executors are the “workstation” for “tasks”. The Executor acts as a middle man to handle resource allocation and distribute task completion. There are many options available in Airflow for executors. Executors run inside the scheduler.

Sequential Executor
Debug Executor
Local Executor(Single Node Arch)
Dask Executor
Celery Executor
Kubernetes Executor
Scaling Out with Mesos (community contributed)

Note: Executor maintains the state in Memory. This may cause some inconsistency.

MWAA uses Celery Executor.

Order of Operation

  • The scheduler starts DAGs based triggers (scheduled or external).
  • Scheduler load the steps/task within the DAG and resolves the dependency
  • Scheduler places runnable tasks in the queue.
  • Workers pick up those tasks from the queue and run them.
  • A worker once finished with the task, updates the status of the task
  • Based on the tasks’ status overall DAG status is decided.

DAG

A Directed Acyclic Graph(DAG) is a graph object that represents a workflow in Airflow. It is a collection of tasks in a way that shows each task’s relationships and dependencies. DAGs contain the context of task execution. In MWAA we will keep DAGs in S3. For a new DAG file, it takes about a minute for Amazon MWAA to start using the new file. For existing DAG files, it takes about 10 seconds to recognize updates to an existing DAG file.

Operators

These are the building blocks of DAG. They define the actual work DAG will perform. Operators determine the nature of the task. It is represented with a Python class that acts as a template for the type of task. They are Idempotent.

BashOperator — execute a bash statement
PythonOperator — run a python function
SparkSubmitOperator — spark-submit

There are three categories of operators: Action, Transfer & Sensor.

Task — Instance of an operator or Sensor.

Plugins

Plugins are used as an easy way to write, share and activate custom behavior of runtime.

__init__.py
|-- airflow_plugin.py
hooks/
|-- __init__.py
|-- airflow_hook.py
operators/
|-- __init__.py
|-- airflow_operator.py
sensors/
|-- __init__.py
|-- airflow_sensor.py

Note: Since in MWAA we do not directly get access to the runtime where workers are processing tasks. We can use requirement.txt to install the available python module. For not available python module or custom modules, we can zip the packages locally and then push to S3 and then use it as and when it is required.

Hooks

Hooks allow you to connect your DAG to your environment. These are meant as an interface to interact with external systems. We can create an S3 connection and use S3 Hooks to get the connection info and do our task. There are various hooks (HTTP, Hive, Slack, MySQL) and the community keeps on adding hooks.

Sensors

Sensors can be described as special operators that are used to monitor(poll) a long-running task, file, database row, S3 key, another DAG/task, etc.

XComs

XComs (cross-communication) is designed to communicate between tasks. We use xcom_push and xcom_pull to push and retrieve variables.

Tasks get transitioned from one state to another when DAG is run. First, the Airflow scheduler determines whether it’s time for a task to be run and any other dependencies for the task are complete or not. At this time, the task enters the scheduled state. When a task gets assigned to an executor, it enters the queued state. When the executor finally picks up the task and a worker starts performing the task’s work, the task enters the running state.

The below diagram explains the transition.

In the above diagram, the reschedule state is generally not known to many people. This is for sensor tasks with mode='reschedule' . This mode means that if the criteria of the sensor are not met then the sensor will release the worker to other tasks and it’s rescheduled at a later time. This is very useful for cases when the sensor may wait for a long time.

Note: In new version of Airflow we have two new states 1. Deferred 2. Shutdown.

Writing DAG:

with DAG(
dag_id='DAG_NAME',
default_args=default_args,
max_active_runs=3,
schedule_interval='@daily'
) as dag:

The above code snippet will initialize a DAG with a name(dag_id). It will pass or make available the dictionary called ```default_args``` to every task. The key-value pair can be overridden by the task individually. Schedule interval tells when to run this dag, max_active_run tells how many instances of the Dag can run concurrently.

Let us expand default_arg.

default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 01, 31),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': notification_helper,
'tags'=['platform']
}
  • owner: the owner of the task, use the Unix username
  • start_date: The execution_date for the first DAG run
  • retries: The number of retries that can be performed
  • retry_delay: The delay time between retries.
  • on_failure_callback: The function to be called when a task instance fails.

Adding tasks & dependencies:

    t1 = BashOperator(
task_id='print_date',
bash_command='date'
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3
)
t1 >> t2

Let us see DAG in action. We will leverage MWAA here. To use it we will perform the following three actions.

Create Environment

Upload DAGs to S3

Run your DAG in Airflow

Toggle ON the DAG, and workflow will trigger.

Click on an individual task and see its progress and details.

Advantages

Programmatic Pipeline definition — All DAGs are python based.

Dynamic through Jinja Templating — Jinja templating makes the pipeline dynamic and reusable.

Community-contributed solutions — There are plenty of community provided solution

Extensible through Plugins — We can upload zipped files with all custom modules

Smart scheduling — It is way more than cron. It is “data-aware”.

Dependency Management, Graceful failures handling and Retries

Monitoring & Audit Capability Out-of-the-box

Catchup & Backfill

Catchup — Airflow will run all scheduled DAG runs instances for which it has no status recorded, starting with the start_date you specify in your “default_args”.

Backfilling — We can run the dag for a specified historical period.

— — — — — — — — — — — — — — — — — — — — — — — — —

Airflow alternatives

  • Luigi
  • Kedro
  • Cadence / Temporal
  • Conductor (Netflix)
  • Dagster / Prefect
  • NiFi
  • Kestra
  • AWS Step Function

Happy Learning!!

--

--

Amit Singh Rathore
Nerd For Tech

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML