Apache Airflow HA environment

WB Advanced Analytics
inganalytics.com/inganalytics
6 min readMay 29, 2017

We have one Hadoop cluster with Apache Airflow as a workflow scheduler and monitor in our current environment. In the near future, we want to build two new Hadoop clusters to handle the production workloads. The airflow instance that we currently use is a single node, everything installed on one server: consisting of a web server, scheduler and worker; we use PostgreSQL DB for the meta data. We started investigating the possibility to make our Airflow Environment Highly Available in both data centers.

The new requirements for the HA solution are:

  • Utilize both Hadoop clusters.
  • Ensure HA for Airflow; if one of the master nodes were to go down or be purposely taken offline, the Airflow cluster must still be operational (task executed).

In this post, I will describe our new ‘to-be’ Airflow Architecture and the problems that we are encountered during the installation and configuration phase.

To-be Airflow multi-node environment

We will use two Airflow master nodes in the new environment, one in each data center. Both of them will run the web server (GUI) and scheduler components, and will be able to schedule tasks independently of each other. The workers will listen to different queues depending on the role and location of the worker (see “routing the workload” for more detailed information, below).

The Airflow meta-data will be stored in a multi-master Maria DB Galera Cluster. All Airflow nodes will point to this meta-store, and the job results also stored in the Maria DB Galera Cluster.

In Airflow, tasks and dependencies are defined with a piece of Python code called a DAG (a Directed Acyclic Graph). Our Airflow Master A (site A) will pull the Airflow DAGs from our code repository system and distributed them with RSYNC to all the Airflow servers in both the data centers. Any DAG that’s been manually changed on Master B, or its workers, will be overwritten. It’s uncertain if this RSYNC solution is sustainable, because of complex security constraints foist upon us from above. But for now, we will use it in our local lab environment.

Routing the workload

Workers

We use two types of workers: regular workers, and ETL workers. The first can access the Hadoop environments; the second has access to the internal data sources. Our environment is in a restricted security zone, so not every server has access to the internal data sources. That’s why we’ve had to introduce the ETL worker as a middleman. This system has a Hadoop client for accessing HDFS, and Sqoop for ingesting the data from the internal source systems.

Queues

We have defined five queues in our setup. The default queue can be used for tasks that aren’t bound to one cluster, where it doesn’t matter on which cluster the queue is executed (read and report operations). All but the ETL workers will listen to the default queue. The next two queues are the a_queue and b_queue, mapped to the different workers in the data-centers in sites A and B, respectively.

This allows us to monitor in which data-center the Airflow task will create and modify data on HDFS/Hive so that we can replicate/sync the data to the other Hadoop cluster. The data-replication will be part of all DAGs that are creating and modifying data. For this we will probably create an Airflow operator.

1. Task

2. Data transformation (new table in Hive)

3. Replicate data

Airflow allows you to start the Airflow worker process with specific queues to which it should listen. The worker will then only pickup tasks from that specified queue or list of queues. No additional airflow configuration is needed.

To start the worker listening to a specific queue requires only the queue and site to be specified:

Command : airflow worker — queues=default, sitea

This will work perfectly in a greenfield situation. We can schedule a task for a specific queue and route it to one of the workers associated to the queue. But what if we configure all of our DAGs to use a specific site, say A? If this worker were to go down, or be purposely taken offline, we would then have to change the target queue in the DAG to the site B queue in order to get the task executed again, which is less than ideal, but something we still need to investigate in our lab environment.

Possible solutions could be:

  • Create a check if the work for that queue is still online, if not switch to a secondary queue/worker node. Such a change can be made in the Airflow code or by building an appropriate check directly in the DAG (as a “check worker” operator).
  • Use the default queue, but add a return value to indicate which worker picked up the task (necessary for the data-sync)

Here’s an example of a simple Bash operator DAG that uses the site A queue:

Message service

We use Redis as a message queue service in this test setup. Airflow supports multiple messaging services like RabbitMQ, AWS, SQS, etc. Although Redis is fast and easy to setup, the lack of security will likely mean our message service will need to change to RabbitMQ in our production environment, with SSL and Kerberos enabled. We already have a running RabbitMQ cluster running (without SSL and Kerberos) in our lab environment, so the next steps would be to enable security and change the Airflow configuration to use RabbitMQ instead of Redis.

For our current setup we will continue with Redis as our message service.

Installation / Configuration

Note that in our lab environment we will install and configure the Galera cluster on the Airflow master nodes, whereas in the production environment we might use dedicated machines.

The current stable version of Airflow (1.8) doesn’t handle two active schedulers very well. In our first attempt we ran in to some deadlock situations, with the help of my colleague Bolke de Bruin, who made some code adjustments, the multi-master solution seems to work (though we need to run additional tests to check if the solution works with all scenarios and doesn’t interfere with other Airflow functions). You can pull the fix from Bolke’s own git repository for now (https://github.com/bolkedebruin/airflow/tree/fix_deadlock), and once the code is fully tested, you can expect it to be merged in to the subsequent version of Airflow.

Pre-requirements

  • CentOS 7
  • Working message queuing service running (Redis,RabbitMQ)
  • Working database server with Airflow database (MySQL,MariaDB,Galera)

After the MariaDB Galera Cluster installation we need to create the Airflow database and user.

Create the airflow database if it doesn’t exist

CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;

Grant access to the airflow user

grant all on airflow.* TO ‘USERNAME'@'%' IDENTIFIED BY ‘{password}';

Steps

  1. Install Apache Airflow on ALL machines that will have a role in the Airflow (master/worker)

Install the required packages :

yum groupinstall "Development tools"

yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel python-devel wget cyrus-sasl-devel.x86_64

Install PIP and any required package

sudo yum -y install python-pip

Install the MariaDB repo and install the MariaDB client

An example MariaDB.repo file for CentOS 7 is:

add it into a file under /etc/yum.repos.d/

With the repo file in place you can now install MariaDB like so:

sudo yum install MariaDB-server MariaDB-client

Install Airflow:

pip install airflow
pip install airflow[celery]

2. Apply Airflow configuration changes to all machines. Apply change to the {AIRFLOW_HOME}/airflow/airflow.cfg file.

1. Change the Executor to CeleryExecutor

executor = CeleryExecutor

2. Point SQL Alchemy to the MetaStore

sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

3. Setup the broker URL

For Redis

broker_url = rdis://{REDIS_HOST}:6379/0

For RabbitMQ

broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/

4. Point Celery to the MetaStore

celery_result_backend = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow

5. Deploy your DAGS on your airflow machines in

Create the Airflow directories

cd {AIRFLOW_HOME}
mkdir dags
mkdir logs

{AIRFLOW_HOME}/airflow/dags

6. Initialize the Airflow database

On masterA initialize the Airflow database

airflow initdb

7. Start the Airflow components

On the Airflow Masters

airflow webserver 
airflow scheduler

On the Airflow Workers

airflow worker --queues={queue}

--

--