When Airflow isn’t fast enough: Distributed orchestration of multiple small workloads with Celery
DISCLAIMER 1: If you already have a firm knowledge on what orchestration, Apache Airflow and Celery are, go directly to the “An Airflow use case: managing an ETL for user data extraction, processing and storage” section just to get a feel of the example use case, and then start reading from “The problem with fast tasks and Airflow”
DISCLAIMER 2: Seasoned data engineers might find this comparison a bit weird, since Airflow can and should work together with Celery. This is all addressed bellow, so do give me a chance! :)
What is orchestration and why you should care
When working with data of any kind, there are always three basic steps that need to be accomplished, commonly known in the most basic scenarios as ETL processes:
- Get the data from a given source.
- Process it so it gives us the insights we are looking for.
- Load/insert the data into a database or persistence of any kind, so interested parties can query it with ease.
In order to illustrate, let’s consider the following scenario:
- We have website user data in the form of logs, that arrive to our server on an hourly basis, and we want to store them in a MySQL server table.
- We first need to extract the data from disk, process it so that the schema fits the existing table, and finally load the data to the MySQL sever.
Now, if we were to design this as a DAG of tasks (direct acyclic graph), just to get a feel of the steps we are dealing with, we would get something like this:
And finally, via a cron instruction such as this
“ 0**** ”
we would be able to execute the ETL every hour.
Fairly simple right? The problem though, is that in a company/business context, you rarely have one single workflow with a fixed set of tasks. You have multiple data sources, multiple workflows, task dependencies on other tasks or workflows, specific different schedules for each workflow, specific deadlines, multiple scenarios based on task success or failure, etc.
So you can see very quickly that instead of having a workflow like the one shown above, you end up with something like this instead:
Orchestration tools serve as a means to facilitate this coordination of tasks and workflows, allowing to even perform distributed coordination using multiple machines inside a cluster, if you have a lot of processes or a lot of data to process.
One of the most popular orchestrators right now is Apache Airflow, which I’m now going to talk about.
I really can’t define Airflow any simpler than the description in the documentation: “ Airflow is a platform to programmatically author, schedule and monitor workflows” .
So in essence, Airflow is an orchestrator that allows for two important things, targeted for different groups of people:
- For developers, it makes it extremely simple to create complex ETL scheduling in a programmatic way, using Python scripts. Each workflow, much like the example in the previous section, is a DAG of tasks that need to be executed in a certain order and scheduled.
- Airflow also presents the concept of operators and hooks, which facilitates the programming of diverse tasks, from sending data from S3 to Hive, to sending a message to a Slack channel.
- For product owners/leads on the other hand, it provides a rich UI,with which tasks can be monitored in real time, be it via graphical representations of the DAGs, their execution times and current status, or even by giving us an SQL engine in which we can use standard queries to get the state of a certain workflow or task, as well as their history.
But let’s take a look at a practical example.
An Airflow use case: managing an ETL for user data extraction, processing and storage
Imagine we have a three step ETL to extract user data from a raw JSON file, process it and load it to an SQL table. This ETL should run on a 30 minute basis.
This is an example of how the raw data looks like:
The goal now, is to get the data into a SQL table like so:
The scripts we need to accomplish these three tasks are actually quite simple to implement, and you can find them all in the following git repository https://github.com/ManuelMourato25/airflow_celery_example_project/tree/master/airflow/working_dir
In order to simply test Airflow though, you don’t need to understand the ETL tasks; just follow the steps bellow to get your Airflow host/cluster up and running :
- Install Airflow: This of course is a given. You can find a fantastic installation guide here for both Ubuntu and CentosOS: http://site.clairvoyantsoft.com/installing-and-configuring-apache-airflow/
Don’t forget to install RabbitMQ and Celery, just as the instructions describe, as well as setup your MySQL server.
- Getting Airflow scripts: As I mentioned above, Airflow is capable of scheduling and managing ETL tasks using simple Python scripts. You can obtain the scripts for this specific project by simply cloning the following git repository: https://github.com/ManuelMourato25/airflow_celery_example_project
- Placing scripts in their respective directories:
First, go to the airflow/ folder
Place working_dir/ in your root dir ( / )
Place data_dir /in your root dir ( / )
Place the dags/ folder in your AIRFLOW_HOME dir
Place the airflow.cfg fle in your AIRFLOW_HOME dir
- Tweak airflow.cfg: This file contains end_points and urls that you need to change to fit your own environment, so take your time examining this file.
- Start Airflow services:
Start airflow webserver in your webserver host, or locally
Start airflow scheduler in your scheduler host, or locally
If everything went well, you should see the following UI by going into http://<webserver_hostname>:8080
Running Airflow in a distributed fashion: Enter Celery
If you take a look at airflow.cfg, you will notice that there are several options for the “executor” field, that is, the engine that will execute your scheduled tasks. And one of those options includes “CeleryExecutor”.
This is where you might get a little confused given the title of this article. Indeed Airflow can (and in fact it is preferred) use Celery workers as an engine to execute the scheduled tasks that we mentioned above. But what is Celery exactly? It’s actually not that hard to explain.
“ Celery is an asynchronous task queue/job queue based on distributed message passing”.
- The way Celery works, is that it connects to a queuing framework, such as RabbitMQ.
- This Rabbit server in turn, contains multiple queues, each of which receives messages from either an airflow trigger or an execution command using the Celery delay command.
- Each of these messages are ordered, and tell the Celery executors what is the task that they need to execute next.
- Celery executors can retrieve task messages from one or multiple queues, so we can attribute queues to executors based on type of task, type of ETL, etc.
The great plus of using Airflow and Celery together, is that we can have Celery/Airflow workers running in multiple cluster hosts and distribute the execution of our ETL tasks in parallel, thus accelerating the execution of an ETL.
Knowing now how Celery works together with Airflow, we can execute the “rawFile_to_SQL” dag by performing the following steps:
- Go to the hosts where you want to run your workers, or alternatively just do it locally, and run airflow worker -q <NAME_OF_QUEUE>
This queue was specified in the ‘raw_file_to_sql.py’ script as being ‘ rawToSQL’.
- Go to the web UI, and turn the “rawFile_to_SQL” DAG on.
- If everything went well, the airflow worker should be showing execution logs for the current DAG.
Note: if you are executing the airflow worker as the root user, go to “~/.bashrc” and set the following variable “export C_FORCE_ROOT=true” before running “airflow worker -q <NAME_OF_QUEUE>”.
By clicking on one of the triggered DAG executions, you should see the following in the UI:
Similarly, if you go to your MySQL server and run:
“USE user_db; SELECT * FROM processed_msg;”
You will see the following output table:
The problem with fast tasks and Airflow
After seeing how smoothly our “Raw_to_SQL” ETL ran with Airflow performing its orchestration (hopefully!), you might be wondering what the title of this article is all about. Well, rather then tell you I should instead show you.
These are the Gantt charts (also an Airflow UI feature) for three different time instances in which the ETL has ran, with the task duration in green.
Do you see the issue? The tasks themselves as you can see, are completed in less than one second, but the time between tasks is in the order of 4–5 seconds!
Let’s think about this a little bit: if you have a use case in which your tasks are long and complex, moving or transforming big data, then a few seconds between each task is something you can spare, and if so Airflow would be ideal for you.
But let’s now consider a different scenario: you have iot/financial/user data coming from thousands of different sources to your cluster. The goal is for your client to have that data as fast as possible, to detect failures/tendencies/etc as soon as possible (under 10 or 30 seconds). In this situation, you have to launch an ETL process every time there is data for one of those thousands of sources, so the data amount itself is quite small each time.
So not only will you have a lot of concurrent ETL processes, but each of them will have some number of tasks that are run sequentially. In a situation like this, you can’t have an ETL process with 5 second breaks between tasks, especially if there are 5 or 10 or more tasks that run sequentially. The task scripts should be executed immediately, one after the other.
You may ask then: why not simply run the tasks in a single host? Since they are small and fast we should just execute the scripts inside the host, using something like cron for scheduling.
That could work, but if we have thousands of data sources and want the data from them as fast as possible, we will have a scenario in which we have thousands of tasks running concurrently and a single host might not have the cpu/memory to support them all.
But you might still be asking: well, why don’t we simply take all of the tasks that run sequentially, and put them in the same script? That would at least solve the intervals between tasks issue.
This is actually the perfect approach for the use case presented in the sections above. However, there are many scenarios in which this isn’t possible: for example,when you have a task that you want to run even if the previous task has failed. If all tasks are in the same script, if one step fails, the whole script fails, which is not ideal.
For all these issues with running a massive number of fast/small tasks , we require fast distributed task management, that does not require previous resource allocation (as Airflow does), as each ETL task needs very few resources, and allows tasks to be executed one after the other immediately.
Celery Alone: why less can be more
Apache Airflow, for the outlined reasons, is clearly not fit for this use case. So what if we remove Airflow’s heavy stuff, but leave the underlying task execution and management infrastructure? That, is simply Celery.
With Celery, sequential tasks not only execute immediately one after each other, but we also don’t have to allocate space previous to their execution, as we did in Airflow.
Each task is simple a script execution, and uses only the memory it needs to run its code.
It is a light weight real time solution, which fits this type of use case where you have ETLs that require extremely low latency, while still needing to execute a massive number of small tasks in the process.
Let’s see how Celery works in practice:
- Celery requires a backend if you wish to store your task results.
As such, the first thing thing you need to do, is to create a celery database in your MySQL server, and grant it privileges so you can access it:
“ CREATE DATABASE celery;
GRANT ALL PRIVILEGES ON celery.* TO <username>@’%’ IDENTIFIED BY ‘<password>’; ”
- Next, go to the git repository that you cloned for the Airflow test, and enter the “celery folder/celery_for_medium/” folder.
In this folder you will find a script called “celery.py”. Open it and replace the values inside <…> with your respective server names and user/password.
- Provided that you have already setup your directories and files as described in the “An Airflow use case: managing an ETL for user data extraction, processing and storage” section above, simply go back to the “celery/” directory and run the following command:
“celery worker — app celery_for_medium.celery — loglevel=INFO -n worker1@<WORKER_SERVER> -Q json_to_sql_celery”
- If everything goes as planned, you should see the celery worker starting like so:
- The steps above allowed us to start the Celery worker.
Now its time to give an execution order, so that tasks are sent to the “json_to_sql_celery” RabbitMQ, and as a consequence, Celery will read the task orders and execute them.
Simply open another window terminal and run
“python execution_script.py” on the same folder
- Again if everything goes well, you should immediately see the celery worker showing execution logs, like so:
- Notice also that tasks are executed immediately one after the other, with no delay between them, the opposite of Airflow.
This can also be seen on the terminal where you executed “python execution_script.py”:
You may also notice that despite us having only one worker, the log presents multiple worker instances like “Worker-10”. This is because, even though we launched one worker, it is still performing tasks concurrently using the host’s threads. This concurrency level can be tuned.
- Finally, if you want to check your worker state, open another terminal and simple run “flower” (if it is not installed simply run “pip install flower” first) and you should be able to see the UI in port 5555:
Of course this comes with a few disadvantages:
- The intuitive Airflow UI that is incredibly useful for monitoring purposes, as well as many other features, is now not available.
You can still use “flower” to monitor Celery workers, but it can’t compare to the Airflow UI.
- All the Airflow operators and hooks that made it extremely simple to interact with HDFS, Hive, execute bash commands,etc, are now not available as well.
Apache Airflow on Celery vs Just Celery depends on your use case.
For most scenarios Airflow is by far the most friendly tool, especially when you have big data ETLs in which tasks take a long time to execute. In these scenarios, a couple of seconds between tasks becomes negligible.
If however you have a special use case in which data comes at you really fast from multiple sources, where you need distributed computing but still incredibly fast latency, then I would say give Celery a shot.
I hope you enjoyed!
Please give your opinions and suggestions bellow!
Manuel Duarte Vilarinho Mourato @ Big Data Engineer