Setup Apache Airflow in Multiple Nodes in Google Cloud Platform
Apache Airflow is one of the most used platform in data engineering world to schedule and monitor data workflows. All using python to create a job make it easier for data analyst, data scientist and data engineers to create and maintain data pipelines.
I use Apache Airflow at work. I heard some companies also use Airflow for their data pipelines. Scalability, extensibility, complexity, and user friendly are the reasons that make Airflow popular in data community.
Existing articles mainly focus in a single node, make me wonder how to make Airflow horizontally scale. This article explains how to install and setup airflow multi nodes in ubuntu using google cloud platform.
In this article we need 2 instance:
- airflow-main for airflow webserver, scheduler, celery flower, celery worker, and RabbitMQ.
- airflow-worker-1 for airflow celery worker
Now, create airflow-main
instance in Google Compute Engine.
Now, ssh to your instance by ssh username@<ip_address>
There is a different name in the name instance. I have already created an instance before writing this tutorial. But, that shouldn’t be a problem. My instance is called indo-large-text
Don’t forget to add ssh key :)
Setup PostgreSQL
For not being repetitive, I follow the guidelines from this article to setup main node and some modifications. But, these are the ubuntu commands that are similar with the cited article to setup PostgreSQL and Airflow.
If you haven’t install pip
sudo apt install python3-pip
in case it is not the updated version
sudo pip install --upgrade pip
Install Airflow Database
In this article, PostgreSQL is used for Airflow metadata.
sudo apt-get install postgresql postgresql-contrib
Create User and Database
sudo adduser airflow
Add new user to sudo group
sudo usermod -aG sudo airflow
Add user airflow in psql role
~$ su - postgres
~$ psql
Now, GRANT user airflow in psql
postgres=# CREATE USER airflow PASSWORD ‘a1rfl0w’;
CREATE ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
GRANT
postgres=# \du
Verify
~$psql -d airflow
airflow=> \conninfo
Output:
You are connected to database “airflow” as user “airflow” via socket in “/var/run/postgresql” at port “5432”.
Change PostgreSQL Configurations
sudo nano /etc/postgresql/12/main/pg_hba.conf
Change IPV4 address to 0.0.0.0/0 and the IPV4 method to trust.
# IPv4 local connections:
host all all 0.0.0.0/0 trust// Restart the service
sudo service postgresql restart
Next, we configure postgresql.conf.
sudo nano /etc/postgresql/12/main/postgresql.conf
# — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
# CONNECTIONS AND AUTHENTICATION
# — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —# — Connection Settings -#
#listen_addresses = ‘localhost’ # what IP address(es) to listen on;
listen_addresses = ‘*’ # for Airflow connection// Restart the service
sudo service postgresql restart
Setup RabbitMQ
RabbitMQ is a message broker that handles jobs in queue and mange how to distribute jobs into several nodes.
I follow this article to install
To install RabbitMQ and setup the configuration:
sudo apt-get install erlangsudo apt-get install rabbitmq-serversudo service rabbitmq-server
Enable RabbitMQ management dashboard
sudo rabbitmq-plugins enable rabbitmq_management
To open the dashboard, open a browser and go to localhost:15672 or <ip_addr>:15672
The default user is guest and password guest. Note that this only apply in localhost. If you access via ip_address you need to create a user.
I encountered errors by following the article above. So, I visited celery documentation to setup RabbitMQ.
create user and password
$sudo rabbitmqctl add_user admin admin$sudo rabbitmqctl add_vhost myvhost$sudo rabbitmqctl set_user_tags admin administrator$sudo rabbitmqctl set_permissions -p myvhost admin ".*" ".*" ".*"
Setup Apache Airflow Main
Install Apache Airflow in Main Node
pip install ‘apache-airflow[postgres, mysql, celery, rabbitmq]’
Initialize Airlfow DB Config and Generate airflow.cfg
airflow initdb
Re-config airflow.cfg
Change executor to CeleryExecutor
executor = CeleryExecutor
Change DB connection to PostgreSQL
sql_alchemy_conn = postgresql+psycopg2://airflow@localhost:5432/airflow
Change result_backend to PostgreSQL
result_backend = db+postgresql://airflow@localhost/airflow
Disable examples, let True if you want to see the examples of DAGs
load_examples = False
Save and run airflow initdb
Create Airflow UI user
# create an admin user
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
Run Airflow and Friends in the background. remove -D to debug
$airflow webserver -D$airflow scheduler -D$airflow celery worker -D
Apache Airflow should run by now and the DAGs may loaded from the examples.
Congratulations!!!
Now you can try try your DAGs and explore Airflow UI.
Setup Apache Airflow Workers
Next, we need to setup airflow workers in the other instances.
Create Google Compute Engine
Open Google Cloud Console and Point to Compute Engine
Create worker instance:
- Name: airflow-worker-1
- Region: Singapore
- Machine Type: e2-medium
- Boot disk: Ubuntu 20.04 LTS, Persistent Disk 10 GB
- Firewall: Enable HTTP, HTTPS
You will look in the compute engine dashboard
The steps are basically the same as the above. We only need install and change the airflow.cfg in some lines.
ssh to you instance and let’s start to config airflow celery worker. Install Apache Airflow and Its dependencies.
$ sudo apt install python3-pippip install ‘apache-airflow[postgres, celery, rabbitmq]’
Verify installation
$ airflow version
2.2.3
Edit airflow.cfg
nano ~/airflow/airflow.cfg
Change 3 variables
sql_alchemy_conn = postgresql+psycopg2://airflow@<ip_addr>:5432/airflowbroker_url = amqp://admin:admin@<ip_addr>:5672/myvhost
result_backend = db+postgresql://airflow@<ip_addr>/airflow
Save and close.
Now, to verify if the worker is connected to our main airflow. Open airflow celery flower in a browser localhost:5555 or <ip_addr>:5000. You supposedly see like this.
Testing Multi-nodes Airflow
Create DAG:
- dag_id = get_instance_v6
- start_date = 60 minutes ago
- interval = every 5 minutes
- 3 Tasks: get hostname, get ip address, sleep 2 seconds
save this file as get_instance.py
and locate to ~/airflow/dags/ in the main and worker instance.
You will see in the Airflow DAGs
Detail DAGs
To check if each task run in the different node.
Conclusion
Congratulations! you should be able to run DAGs in multiple nodes.
I learn this airflow stuff about two weeks. As a person who just migrate from Machine Learning Engineer to SQL Query Maker that often to deal with this thing and often encounter DAG error and wonder how the hell this thing works. I learn by myself via googling, read articles, asking my colleagues who work as data engineer, and from the books.
If you are in the data role and wants to connect with me. I am on Discord andreaschandra#4851.
If you are ✨ RECRUITER ✨and have an open position that mainly dealing with machine learning especially NLP and Computer Vision. Drop a message via Linkedin.