Airflow (MWAA)— Automating ETL for a Data Warehouse
Airflow is an open-source workflow management platform. We define those workflows with DAGs/“Configuration as code” written in python.
Apache Airflow at a high level has the following components talking to each other.
To run a basic instance of Airflow we need to have an EC2/ECS cluster, an RDS instance, workers, schedulers and webservers, EFS for having DAGs available to all worker containers and etc. The below diagram is one such deployment.
As seen above, we have many moving components and we have to manage all of those ourselves. This becomes a pain as we keep on having complex setups and workflows. To ease the pain, AWS has launched a managed service — Amazon Managed Workflow for Apache Airflow. It is a fully-managed orchestration service for data pipeline which simplifies running open-source versions of Apache Airflow on AWS. The below image describes what we get with MWAA. Everything Worker, Scheduler, Queueing, DB, Webserver is in AWS managed VPC and is totally managed by AWS.
Notice in the above diagram in your account MWAA service will create some ENI( for subnet-security group pair). This is similar to the lambda networking (Lambda in VPC with Hyperplane ENI). In your AWS account, you will need to have an S3 bucket to store DAGs and Airflow supplementary artifacts (plugins).
Before we create MWAA and run anything let's get some refresher on Airflow concepts:
Webserver
Provides control interface for users and maintainers. Airflow UI is a Flask + Gunicorn setup. It lists DAGs, their run history, schedule, pause start options. This is a central place from where we can manage the airflow pipelines. It also handles the APIs. You can apply custom themes by setting APP_THEME = "[theme-name].css"
(last line in webserver_config.py
) and setting one of the themes listed here.
Metadata Database
Airflow uses a database that is supported by SQLAlchemy Library e.g. PostgreSQL or MySQL. This DB powers the UI as well as acts as the backend for the worker and scheduler. The metadata database stores configurations, such as variables and connections. It stores user information, roles, and policies. It also stores DAG-related metadata such as schedule intervals, tasks, statistics from various runs, etc.
Scheduler
A daemon, built using the python daemon library. Schedules & delegates tasks on worker node via executor. Also takes care of other housekeeping tasks like concurrency checks, dependency checks, callback, retries, etc. Three main components of the scheduler are:
- SchedulerJob
- DagFileProcessor
- Executor
Worker
These are the workhorse of the airflow. They are the actual nodes where tasks are executed.
Executor
Executors are the “workstation” for “tasks”. The Executor acts as a middle man to handle resource allocation and distribute task completion. There are many options available in Airflow for executors. Executors run inside the scheduler.
Sequential Executor
Debug Executor
Local Executor(Single Node Arch)
Dask Executor
Celery Executor
Kubernetes Executor
Scaling Out with Mesos (community contributed)
Note: Executor maintains the state in Memory. This may cause some inconsistency.
MWAA uses Celery Executor.
Order of Operation
- The scheduler starts DAGs based triggers (scheduled or external).
- Scheduler load the steps/task within the DAG and resolves the dependency
- Scheduler places runnable tasks in the queue.
- Workers pick up those tasks from the queue and run them.
- A worker once finished with the task, updates the status of the task
- Based on the tasks’ status overall DAG status is decided.
DAG
A Directed Acyclic Graph(DAG) is a graph object that represents a workflow in Airflow. It is a collection of tasks in a way that shows each task’s relationships and dependencies. DAGs contain the context of task execution. In MWAA we will keep DAGs in S3. For a new DAG file, it takes about a minute for Amazon MWAA to start using the new file. For existing DAG files, it takes about 10 seconds to recognize updates to an existing DAG file.
Operators
These are the building blocks of DAG. They define the actual work DAG will perform. Operators determine the nature of the task. It is represented with a Python class that acts as a template for the type of task. They are Idempotent.
BashOperator — execute a bash statement
PythonOperator — run a python function
SparkSubmitOperator — spark-submit
…
There are three categories of operators: Action, Transfer & Sensor.
Task — Instance of an operator or Sensor.
Plugins
Plugins are used as an easy way to write, share and activate custom behavior of runtime.
__init__.py
|-- airflow_plugin.py
hooks/
|-- __init__.py
|-- airflow_hook.py
operators/
|-- __init__.py
|-- airflow_operator.py
sensors/
|-- __init__.py
|-- airflow_sensor.py
Note: Since in MWAA we do not directly get access to the runtime where workers are processing tasks. We can use requirement.txt to install the available python module. For not available python module or custom modules, we can zip the packages locally and then push to S3 and then use it as and when it is required.
Hooks
Hooks allow you to connect your DAG to your environment. These are meant as an interface to interact with external systems. We can create an S3 connection and use S3 Hooks to get the connection info and do our task. There are various hooks (HTTP, Hive, Slack, MySQL) and the community keeps on adding hooks.
Sensors
Sensors can be described as special operators that are used to monitor(poll) a long-running task, file, database row, S3 key, another DAG/task, etc.
XComs
XComs (cross-communication) is designed to communicate between tasks. We use xcom_push
and xcom_pull
to push and retrieve variables.
Tasks get transitioned from one state to another when DAG is run. First, the Airflow scheduler determines whether it’s time for a task to be run and any other dependencies for the task are complete or not. At this time, the task enters the scheduled
state. When a task gets assigned to an executor, it enters the queued
state. When the executor finally picks up the task and a worker starts performing the task’s work, the task enters the running
state.
The below diagram explains the transition.
In the above diagram, the reschedule
state is generally not known to many people. This is for sensor tasks with mode='reschedule'
. This mode means that if the criteria of the sensor are not met then the sensor will release the worker to other tasks and it’s rescheduled at a later time. This is very useful for cases when the sensor may wait for a long time.
Note: In new version of Airflow we have two new states 1. Deferred 2. Shutdown.
Writing DAG:
with DAG(
dag_id='DAG_NAME',
default_args=default_args,
max_active_runs=3,
schedule_interval='@daily'
) as dag:
The above code snippet will initialize a DAG with a name(dag_id). It will pass or make available the dictionary called ```default_args``` to every task. The key-value pair can be overridden by the task individually. Schedule interval tells when to run this dag, max_active_run tells how many instances of the Dag can run concurrently.
Let us expand default_arg.
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 01, 31),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': notification_helper,
'tags'=['platform']
}
- owner: the owner of the task, use the Unix username
- start_date: The execution_date for the first DAG run
- retries: The number of retries that can be performed
- retry_delay: The delay time between retries.
- on_failure_callback: The function to be called when a task instance fails.
Adding tasks & dependencies:
t1 = BashOperator(
task_id='print_date',
bash_command='date'
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3
) t1 >> t2
Let us see DAG in action. We will leverage MWAA here. To use it we will perform the following three actions.
Create Environment
Upload DAGs to S3
Run your DAG in Airflow
Toggle ON the DAG, and workflow will trigger.
Click on an individual task and see its progress and details.
Advantages
Programmatic Pipeline definition — All DAGs are python based.
Dynamic through Jinja Templating — Jinja templating makes the pipeline dynamic and reusable.
Community-contributed solutions — There are plenty of community provided solution
Extensible through Plugins — We can upload zipped files with all custom modules
Smart scheduling — It is way more than cron. It is “data-aware”.
Dependency Management, Graceful failures handling and Retries
Monitoring & Audit Capability Out-of-the-box
Catchup — Airflow will run all scheduled DAG runs instances for which it has no status recorded, starting with the start_date you specify in your “default_args”.
Backfilling — We can run the dag for a specified historical period.
— — — — — — — — — — — — — — — — — — — — — — — — —
Airflow alternatives
- Luigi
- Kedro
- Cadence / Temporal
- Conductor (Netflix)
- Dagster / Prefect
- NiFi
- Kestra
- AWS Step Function
Happy Learning!!