Setting Up Celery, Flower, & RabbitMQ for Airflow

(A Prerequisite for Setting up Celery Executor in Airflow)

Mukesh Kumar
Accredian

--

Written in collaboration with Hiren Rupchandani

Preface

In my previous story, you discovered how to work with the Local Executor. Now that you know how to achieve parallel execution of tasks in airflow, it’s time to advance with Celery Executor to build production-ready pipelines. But before we start using Celery Executor, we must set up Celery, Flower (a web server for Celery), and RabbitMQ, which will be a message broker on our system/cloud.

In this story, you will discover how to set up Celery Executor with Flower and RabbitMQ. However, before proceeding, please ensure that your virtual environment is activated.

Setting up a RabbitMQ Server

RabbitMQ is a message broker and provides communication between multiple services by operating message queues. It provides an API (developed using Erlang, a programming language) for other services to publish and subscribe to the queues. The instructions to install RabbitMQ in your system are as follows:

1. Installation of Erlang

  • Firstly, we need to install Erlang on our system using the following code:
(airflow_env) username@desktopname:~$ sudo apt-get install erlang
  • Give the Y as a prompt to allow installation, and grab a cup of coffee while the installation takes place…

2. Installation of RabbitMQ Server

  • Next, we will install the RabbitMQ server using the following command:
(airflow_env) username@desktopname:~$ sudo apt-get install rabbitmq-server
  • You may like to have a cup of coffee while the rabbitmq server installs…

3. Initiation of the RabbitMQ Server

  • Start the server to check if it is running or not using the following commands:
(airflow_env) username@desktopname:~$ sudo service rabbitmq-server startOUTPUT: * Starting RabbitMQ Messaging Server rabbitmq-server
(airflow_env) username@desktopname:~$ service --status-all
OUTPUT:
[ - ] ...
[ - ] rabbitmq-server
[ - ] ...
  • You should be able to see that rabbitmq-server in a running state as a service.

4. Enable the RabbitMQ Management Dashboard

  • Next, we will install some plugins using the following command:
(airflow_env) username@desktopname:~$ sudo rabbitmq-plugins enable rabbitmq_management
  • The default listening port for RabbitMQ is 15672.
  • To run the dashboard (login page), go to http://localhost:15672/
  • The default user is “guest” with the password guest”.
  • After login in, you will be redirected to the dashboard.
RabbitMQ Server Home

5. Create a new user with administrator privileges

  • Create a new user named “admin” with the password “admin ”by typing the following command:
(airflow_env) username@desktopname:~$ sudo rabbitmqctl add_user admin admin
  • We will set the “admin” user as an administrator. To do that:
(airflow_env) username@desktopname:~$ sudo rabbitmqctl set_user_tags admin administrator
  • Now we will give full permission to the “admin” user to read/write data:
(airflow_env) username@desktopname:~$ sudo rabbitmqctl set_permissions -p / admin "." "." ". "
  • Now we can log in again using the “admin ”account’s credentials.
RabbitMQ Login with admin

Setting up Celery and Flower

1. Install Celery and Flower

  • Firstly, we need to install the Celery on our system.
  • Open a new terminal and type the following command in the virtual environment settings:
(airflow_env) username@desktopname:~$ sudo pip3 install apache-airflow[celery]
  • The above command will install the celery, flower, and all the necessary dependencies required between airflow and celery.

2. Check Flower UI

  • A flower is a web-based tool for monitoring and administrating Celery clusters.
  • Type the following command to start the celery flower server:
(airflow_env) username@desktopname:~$ airflow celery flower
OUTPUT:

[2021-09-05 19:07:36,724] {command.py:135} INFO - Visit me at http://0.0.0.0:5555
[2021-09-05 19:07:36,733] {command.py:142} INFO - Broker: amqp://admin:**@localhost:5672//
[2021-09-05 19:07:36,735] {command.py:143} INFO - Registered tasks:
.
.
.
  • By default, it is assigned to port number 5555, so you can go to http://localhost:5555/ and you should be able to see the following home page:
Celery Flower Server Dashboard

3. Update Airflow configuration

  • Now that flower is working, we will introduce some changes in the airflow.cfg file in the airflow directory.
  • Search for the following words/variables/parameters and update the values as shown:
sql_alchemy_conn = mysql+pymysql://sql_username:sql_password@localhost/airflow_db
executor = CeleryExecutor
broker_url = amqp://admin:admin@localhost/
result_backend = db+mysql+pymysql://sql_username:sql_password@localhost/airflow_db
  • Save and close airflow.cfg file.
  • Open a new terminal and start the MySQL server in your environment using the following command:
(airflow_env) username@desktopname:~$ sudo service mysql start
  • Make sure your rabbitmq-server is on as well. If not, run it using the following command:
(airflow_env) username@desktopname:~$ sudo service rabbitmq-server start
  • Now, go to your airflow directory and type:
(airflow_env) username@desktopname:~$ airflow db init
  • It will update the database according to changes made in the airflow.cfg file and start your airflow UI on your browser.

Verification of the Setup via DAGRun

  • The hard part is over. Now, we will check if the airflow is working or not.
  • To start airflow, the MySQL and RabbitMQ services should be running beforehand.
  • We will start four processes: airflow webserver, scheduler, celery worker, and celery flower.
  • You can type the following commands in different terminals:

1. Airflow Webserver

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow webserver

2. Airflow Scheduler

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow scheduler

3. Celery Worker

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow celery worker

4. Celery Flower

(airflow_env) username@desktop_name:~/mnt/c/Users/username/Documents/airflow$ airflow celery flower
Celery Flower with a Celery Worker’s Status
RabbitMQ Server Queues during execution
  • Finally, coming to the Airflow Webserver, delete all the DAGs using the Webserver UI so that the scheduler can automatically update them.
  • We recommend refreshing the screen after 5–10 seconds after deleting a DAG from the UI.
  • Enable any of the updated DAGs from the UI and you should be able to see the execution of the DAG. You can check logs, Gantt charts, graphs, or tree views of the DAG.
Airflow Webserver UI
  • So, we have finally set up Airflow with Celery and RabbitMQ. The upcoming story will use multiple celery workers to run an airflow DAG.

Final Thoughts and Closing Comments

There are some vital points many people fail to understand while they pursue their Data Science or AI journey. If you are one of them and looking for a way to counterbalance these cons, check out the certification programs provided by INSAID on their website. If you liked this story, I recommend you to go with the Global Certificate in Data Science because this one will cover your foundations plus machine learning algorithms (basic to advance).

& That’s it. I hope you liked the explanation of Setting Up Celery, Flower, & RabbitMQ for Airflow and learned something valuable. Please let me know in the comment section if you have anything to share with me. I would love to know your thoughts.

What’s next?

--

--

Mukesh Kumar
Accredian

Data Scientist, having a robust math background, skilled in predictive modeling, data processing, and mining strategies to solve challenging business problems.