How to Setup Airflow Multi-Node Cluster with Celery & RabbitMQ

What is Airflow?

Programmatically author, schedule & monitor workflow. It provides Functional abstraction as an idempotent DAG(Directed Acyclic Graph). Function’s as an abstraction service for executing tasks at scheduled intervals.

Airflow Single Node Cluster

In Single Node Airflow Cluster, all the components (worker, scheduler, webserver) are been installed on the same node known as “Master Node”. To Scale a Single Node Cluster, Airflow has to be configured with the LocalExecutor mode. Worker pulls the task to run from IPC (Inter process communication) queue, this scales very well until the amount of resources available at the Master Node. To scale Airflow on multi-node, Celery Executor has to be enabled.

Airflow Single Node Architecture

Airflow Multi-Node Cluster

In Multi-node Airflow Architecture deamon processes are been distributed across all worker nodes. As Webserver and scheduler would be installed at Master Node and Workers would be installed at each different worker nodes so It can scale pretty well horizontally as well as vertically. to use this mode of architecture, Airflow has to be configured with CeleryExecutor.

Celery Backend needs to be configured to enable CeleryExecutor mode at Airflow Architecture. Popular framework / application for Celery backend are Redis and RabbitMQ. RabbitMQ is a message broker, Its job is to manage communication between multiple task services by operating message queues. Instead of IPC communication channel which would be in Single Node Architecture, RabbitMQ Provides Publish — Subscriber mechanism model to exchange messages at different queues. Each queue at RabbitMQ has published with events / messages as Task commands, Celery workers will retrieve the Task Commands from the each queue and execute them as truly distributed and concurrent way. Which can really accelerates the truly powerful concurrent and parallel Task Execution across the cluster.

Airflow Multi-Node Architecture

Celery:

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. Airflow uses it to execute several Task level Concurrency on several worker nodes using multiprocessing and multitasking. Multi-node Airflow architecture allows you to Scale up Airflow by adding new workers easily.

Airflow Multi-Node Cluster with Celery Installation and Configuration steps:

Note: We are using CentOS 7 Linux operating system.

  1. Install RabbitMQ
yum install epel-release
yum install rabbitmq-server

2. Enable and start RabbitMQ Server

systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service

3. Enable RabbitMQ Web Management Console Interface

rabbitmq-plugins enable rabbitmq_management
RabbitMQ Web Management Console

rabbitmq server default port number is 15672, default username and password for web management console is admin/admin.

RabbitMQ Web management Interface

4. Install pyamqp tranport protocol for RabbitMQ and PostGreSQL Adaptor

pip install pyamqp
amqp:// is an alias that uses librabbitmq if available, or py-amqp if it’s not.
You’d use pyamqp:// or librabbitmq:// if you want to specify exactly what transport to use. The pyamqp:// transport uses the ‘amqp’ library (http://github.com/celery/py-amqp)

Install PostGreSQL Adaptor: psycopg2

Psycopg is a PostgreSQL adapter for the Python programming language

pip install psycopg2

5. Install Airflow

pip install 'apache-airflow[all]'

Check version of airflow

airflow version
Airflow Version check

We are using airflow version v1.10.0, recommended and stable at current time.

6. Initialize Database

After Installation and configuration, you need to initialize database before you can run the DAGs and it’s task. so latest changes would get reflected to Airflow metadata from configuration.

airflow initdb

7. Celery Installation

Celery should be installed on master node and all the worker nodes.

pip install celery==4.3.0

Check the version of Celery

celery --version
4.3.0 (rhubarb)

8. Change in airflow.cfg file for Celery Executor

executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
dags_are_paused_at_creation = True
load_examples = False
Once you have made this changes in the configuration file airflow.cfg, you have to update the airflow metadata with command airflow initdb and later restart the airflow

You can now start the airflow webserver with below command

# default port is 8080
airflow webserver -p 8000

You can start the scheduler

# start the scheduler
airflow scheduler

You have to also start the airflow worker at each worker nodes.

airflow worker

Once you’re done with starting various airflow services.

You can check fantastic airflow UI at

http://<IP-ADDRESS/HOSTNAME>:8000

as we have given port 8000 in our webserver start service command, otherwise default port number is 8080.

Yes! We are done with Building Multi-Node Airflow Architecture cluster. :)

More Blog posts:

  1. Setup and Configure Multi Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines.