CODEX

How to Scale-out Apache Airflow 2.0 with Redis and Celery

Tejas Marvadi
CodeX
Published in
8 min readJan 19, 2021

--

Apache Airflow has become one of the most prevalent tools in the Data Engineering space. It is a platform that offers you to programmatically author, schedule, and monitor workflows. Airflow can be configured to run with different executors such as Sequential, Debug, Local, Dask, Celery, Kubernetes, and CeleryKubernetes. As a data engineer, you would generally configure Apache Airflow on your local machine to run with either Sequential or Local Executor for your development work. However, this setup would preclude you from testing your workflow in a distributed environment. In this article, I will walk you through a step-by-step process to put together a docker-compose, which would allow you to resemble a distributed environment on your local machine. We will be setting up Airflow with a Celery Executor, which will distribute the load to worker nodes. The method I discuss here can be extended to deploy in a distributed production environment.

Prerequisites:

  • Linux (64-bit)
  • Docker
  • Docker-Compose

Step 1: Structure Setup

Let’s first clarify how we are going to set up a distributed Airflow using Docker. In this setup, we will be using Postgres as a backend to store our metadata, and Redis as the message broker. We have an option to use RabbitMQ instead of Redis; however, Redis is easier to deploy and maintain than RabbitMQ. The latter requires extra effort to maintain, and it is difficult to debug crashes.

As depicted below, we are going to spin-up six different docker containers using docker-compose. The Airflow webserver and its scheduler are going to share the same container. We will use publicly available docker images for Postgres and Redis. The two containers will be used for the worker nodes and the last container will be dedicated to monitoring worker nodes.

Airflow Components in Multiple Docker Containers

To get started, let’s first take a look at our folder structure for this project. The parent folder, airflow-docker, contains two folders — dags and config, and three files — docker-compose.yml, Dockerfile, and entrypoint.sh. The dags folder contains a sample DAG — sample_dag.py, which we will execute at the end to demonstrate our distributed environment. The config folder contains a file for environment variables and several files for supporting Airflow configuration setup.

airflow-docker
|__dags
|__sample-dag.py
|__config
|__common.env
|__connections.yml
|__requirements.txt
|__setup_connections.py
|__variables.json
|__docker-compose.yml
|__Dockerfile
|__entrypoint.sh

Step 2: Define Environment Variables & Additional Python Packages

Let’s first define the environment variables that we will be using within our docker-compose.yml file for Postgres, Redis, and Airflow. I choose to use a single environment file for simplification reasons; however, you can split them out into separate files or directly define them within a docker-compose file.

  • common.env
# Postgres DB 
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_DB=airflow
POSTGRES_PORT=5432
POSTGRES_HOST=postgres
# Redis
REDIS_HOST=redis
REDIS_PORT=6379
# Airflow User
AF_USER_NAME=airflow
AF_USER_PASSWORD=airflow
AF_USER_FIRST_NAME=Tejas
AF_USER_LAST_NAME=M
AF_USER_EMAIL=test@test.com
AF_USER_ROLE=Admin

Use this requirements.txt file to install any additional python packages that you may need.

  • requirements.txt
snowflake-connector-python==2.3.8

Step 3: Define Connections and Variables for Airflow

All your needed connections for Airflow can be defined in this YAML file.

  • connections.yml
SAMPLE_DB:
conn_type: mysql
host: 192.168.254.210
login: myuser
password: mypassword
schema: sample_db
port: 3306
SAMPLE_HTTP:
conn_type: http
login: myuser
password: mypassword

Any required variables can be added in a variable.json file.

  • variables.json
{
"NOTIFICATION_EMAILS": "test@test.com",
"sample_api_secret": "abc123"
}

Step 4: Python Script to Create Connections in Airflow

We will use the following python script to create all connections that are defined in the previous step.

  • setup_connections.py
import yaml
from airflow import settings
from airflow.models import Connection

with open("connections.yml", "r") as f:
connection_dict = yaml.safe_load(f.read())
for connection_name, connection in connection_dict.items():
conn = Connection(
conn_id=connection_name,
conn_type=connection.get('conn_type'),
host=connection.get('host'),
login=connection.get('login'),
password=connection.get('password'),
port=connection.get('port'),
schema=connection.get('schema'),
extra=connection.get('extra')
)
session = settings.Session
session.add(conn)
session.commit()

Step 5: Create a DAG File for Testing

This is a sample DAG file to demonstrate our working distributed Airflow environment. We are using two BashOperators and each of these will run on separate worker nodes.

  • sample-dag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 15),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG( 'dist_example',
schedule_interval='0 0 * * *' ,
catchup=False,
default_args=default_args
)
create_command = 'echo $(hostname)'
t1 = BashOperator(
task_id='task_for_q1',
bash_command=create_command,
queue='queue_1',
dag=dag
)
t2 = BashOperator(
task_id= 'task_for_q2',
bash_command=create_command,
queue='queue_2',
dag=dag
)
t1 >> t2

Step 6: Prepare Dockerfile to Create a Base Image

We are going to use the python:3.8-slim-buster image for the Airflow webserver, scheduler, worker nodes, and worker monitor. We are installing netcat, which will be used in the entrypoint.sh script to check the service status for Postgres and Redis. We are also installing vim editor in case if you have to view any config files in docker for debugging purposes.

Finally, we use pip to install the apache-airflow with selected packages. You can also define any additional python packages in requirements.txt in Step 2.

  • Dockerfile
FROM python:3.8-slim-buster
RUN apt-get update && \
apt-get install -y netcat && \
apt-get install vim -y && \
pip3 install apache-airflow[celery,redis,postgres,crypto]==2.0.0
WORKDIR /root/airflow
COPY config/variables.json variables.json
COPY config/connections.yml connections.yml
COPY config/setup_connections.py setup_connections.py
COPY config/requirements.txt requirements.txt
COPY entrypoint.sh entrypoint.sh
COPY dags/ dags
ENTRYPOINT ["./entrypoint.sh"]

Step 7: Define Entrypoint for Airflow Docker Images

We will use this parameterize shell-script to start the web server, worker nodes, and worker monitor thru docker-compose.yml.

  • entrypoint.sh
#!/usr/bin/env bash
TRY_LOOP="20"
wait_for_port() {
local name="$1" host="$2" port="$3"
local j=0
while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do
j=$((j+1))
if [ $j -ge $TRY_LOOP ]; then
echo >&2 "$(date) - $host:$port still not reachable, giving up"
exit 1
fi
echo "$(date) - waiting for $name $host... $j/$TRY_LOOP"
sleep 5
done
}
create_airflow_user() {
airflow users create \
--username "$AF_USER_NAME" \
--firstname "$AF_USER_FIRST_NAME" \
--lastname "$AF_USER_LAST_NAME" \
--role "$AF_USER_ROLE" \
--email "$AF_USER_EMAIL" \
--password "$AF_USER_PASSWORD"
}
setup_airflow_variables() {
if [ -e "variables.json" ]; then
echo "Start importing Airflow variables"
airflow variables import variables.json
fi
}
setup_airflow_connections() {
if [ -e "connections.yml" ]; then
echo "Start setting up Airflow connections"
python3 setup_connections.py
fi
}
install_python_packages() {
if [ -e "requirements.txt" ]; then
echo "Installing additional Python packages"
pip3 install -r requirements.txt
fi
}
wait_for_port "Postgres" "$POSTGRES_HOST" "$POSTGRES_PORT"
wait_for_port "Redis" "$REDIS_HOST" "$REDIS_PORT"
export AIRFLOW__CORE__SQL_ALCHEMY_CONN \
AIRFLOW__CELERY__RESULT_BACKEND \
AIRFLOW__CORE__FERNET_KEY \
AIRFLOW__CORE__LOAD_EXAMPLES \
AIRFLOW__CORE__EXECUTOR
AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgres+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"
AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__FERNET_KEY="wVKXj_gUFi0scVsP-HARZYyxxihQCpj3B2gA_ERaIBE="
case "$1" in
webserver)
airflow db init
sleep 10
create_airflow_user
setup_airflow_variables
setup_airflow_connections
install_python_packages
exec airflow scheduler &
exec airflow webserver
;;
worker)
airflow db init
sleep 10
install_python_packages
exec airflow celery "$@" -q "$QUEUE_NAME"
;;
flower)
airflow db init
sleep 10
exec airflow celery "$@"
;;
*)
exec "$@"
;;
esac

Step 8: Put Together into a Docker-Compose

This is the final-step to package everything together before we can spin-up our distributed Airflow environment. We have six different services, which match what I graphically depicted in Step 1. The Redis container will run on port 6379, and the Postgres container on port 5432. We are also passing required values through our environment file, common.env, to the Postgres image. Both the Redis and Postgres can start independently.

The Airflow webserver requires Postgres for the metadata, thus we have added dependency by adding a “depends_on” tag in the docker-compose file. The webserver is available through port 8080. The two workers are available on a separate port, the first worker is available on port 8081, and the second one is available on 8082. Each worker is associated with a queue. The name of the queue is defined under an environment variable called QUEUE_NAME. Finally, the worker monitor, Celery Flower, will be available on port 5555.

  • docker-compose.yml
version: '3.7'
services:
redis:
image: redis:latest
ports:
- "6379:6379"

postgres:
image: postgres:9.6
env_file:
- config/common.env
ports:
- "5432:5432"
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U airflow" ]
interval: 10s
timeout: 5s
retries: 5
airflow-webserver:
build: .
restart: always
command: webserver
depends_on:
postgres:
condition: service_healthy
ports:
- "8080:8080"
env_file:
- config/common.env
healthcheck:
test: [ "CMD-SHELL", "[ -f /airflow/airflow-webserver.pid ]" ]
interval: 30s
timeout: 30s
retries: 3
airflow-worker-1:
build: .
command: worker
restart: always
depends_on:
- airflow-webserver
ports:
- "8081:8080"
env_file:
- config/common.env
environment:
- QUEUE_NAME=queue_1
airflow-worker-2:
build: .
command: worker
restart: always
depends_on:
- airflow-webserver
ports:
- "8082:8080"
env_file:
- config/common.env
environment:
- QUEUE_NAME=queue_2
airflow-flower:
build: .
command: flower
restart: always
depends_on:
- airflow-worker-1
- airflow-worker-2
ports:
- "5555:5555"
env_file:
- config/common.env

Step 9: Deploy Distributed Airflow

If everything is configured correctly, you should be able to execute the following command from the airflow-docker directory. If you encounter any issue starting the docker containers, then check your configuration files.

docker-compose up -d --build

Let’s confirm all your containers are up and running by executing the following command:

docker ps
List of Running Containers

If you see all your six containers running, then you have successfully deployed your distributed Airflow environment on your local machine. Now let’s verify it is working in a distributed mode by executing our sample DAG file. Navigate to the Airflow UI at http://localhost:8080 and login with the username and password you defined in your common.env file. Once you log in, you should see a DAG called, “dist_example”. It will execute it after a few seconds once you unpause it.

Airflow UI DAGs View

If it runs successfully, you should check the log for each of its tasks to ensure both tasks ran on a different worker node. You should see a different hostname for both logs as indicated below.

Log for Task 1 (task_for_q1)
Log for Task 2 (task_for_q2)

You can also confirm this by navigating to Worker Monitor (Celery Flower) at http://localhost:5555/dashboard. You should see similar output as indicated below screenshot.

If you made it this far, then you have successfully deployed a distributed Airflow environment. Congratulations!

Conclusion

In this article, I explained how to quickly stand up a distributed Airflow environment with Redis and Celery on your local machine. This setup will help you during your workflow development and testing phase. I hope all scripts from this article will help you as a starting point to quickly stand up an environment for your need. I hope you find these step-by-step instructions useful. Please leave your comment below for this article.

--

--

Tejas Marvadi
CodeX

Working as a Data Engineer. I prefer Ubuntu over Windows and nano over notepad.