Airflow is the de facto ETL orchestration tool in most data engineers tool box. It provides an intuitive web interface for a powerful backend to schedule and manage dependencies for your ETL workflows.
In my day to day work-flow, I use it to maintain and curate a data lake built on top of AWS S3. Nodes in my Airflow DAGs include multi-node EMR Apache Spark and Fargate clusters that aggregate, prune and produce para-data from the data lake.
Since these work-flows are executed on distributed clusters (20+ nodes) and have heavy dependencies (output from one ETL is fed in as input to the next) it made sense to orchestrate them using Airflow. However it did not make sense to have a central Airflow deployment as I will be the only one using it.
In this post I will go over how I achieved this along with some brief explanation of design decisions along the way.
In Airflow ETL work-flows are defined as directed acyclic graphs (Airflow DAG) where each node is a self-contained ETL with each downstream node being dependent on successful completion of the upstream node.
Airflow has three deployment components:
- Webserver ( Flask backend used to trigger and monitor DAGs)
- Scheduler ( A daemon process to schedule and run the DAG executers )
- Database ( A presistance layer for DAG & DAG instance definitions )
It is quick and easy to get started with airflow:
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
export AIRFLOW_HOME=~/airflow# install from pypi using pip
pip install apache-airflow# initialize the database
airflow initdb# start the web server, default port is 8080
airflow webserver -p 8080
Upon running these commands, Airflow will create the
$AIRFLOW_HOME folder and lay an
airflow.cfgfile with defaults that get you going fast. You can inspect the file either in
$AIRFLOW_HOME/airflow.cfg, or through the UI in the
Admin->Configuration menu. The PID file for the webserver will be stored in
$AIRFLOW_HOME/airflow-webserver.pid or in
/run/airflow/webserver.pid if started by systemd.
Out of the box, Airflow uses a sqlite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend. It works in conjunction with the
SequentialExecutor which will only run task instances sequentially. While this is very limiting, it allows you to get up and running quickly and take a tour of the UI and the command line utilities.
Here are a few commands that will trigger a few task instances. You should be able to see the status of the jobs change in the
example1 DAG as you run the commands below.
# run your first task instance
airflow run example_bash_operator runme_0 2018-01-01
# run a backfill over 2 days
airflow backfill example_bash_operator -s 2018-01-01 -e 2018-01-02
A container’s main running process is the
CMD at the end of the
Dockerfile. It is generally recommended that you separate areas of concern by using one service per container.
However since we want to have the
Airflow Webserver & the
Airflow Scheduler processes both running, we will use
supervisord as a process manager.
This is a moderately heavy-weight approach that requires you to package
supervisord and its configuration in the docker image (or base your image on one that includes
supervisord), along with the different applications it manages.
Then you start
supervisord, which manages your processes for you. First we will need to define
command=airflow scheduler stdout_logfile=/var/log/supervisor/%(program_name)s.log stderr_logfile=/var/log/supervisor/%(program_name)s.log autorestart=true[program:server]
command=airflow webserver -p 8080 stdout_logfile=/var/log/supervisor/%(program_name)s.log stderr_logfile=/var/log/supervisor/%(program_name)s.log autorestart=true
Then we will use
supervisord as the
ENTRYPOINT to our Dockerfile:
FROM python:3.6.3# supervisord setup
RUN apt-get update && apt-get install -y supervisor COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf# Airflow setup
ENV AIRFLOW_HOME=/app/airflowRUN pip install apache-airflow
COPY /dags/response_rate_etl.py $AIRFLOW_HOME/dags/RUN airflow initdbEXPOSE 8080CMD ["/usr/bin/supervisord"]
Build the Docker image:
docker build . -t airflow
Run the container:
docker run -d -p 8080:8080 --rm \
--name airflow_container \
Launch a DAG:
docker exec airflow_container airflow trigger_dag example_bash_operator
Monitor DAG run:
Open a browser and navigate to
http://localhost:8080 to monitor the DAG instance run.
That’s all folks, stay tuned to future posts where I will go into defining AWS EMR dags, defining custom Airflow Operators, injecting AWS Credentials and more!