Airflow on Kubernetes: Data Pipelines

From Jenkins monolith towards Airflow on Kubernetes at Geoblink

Mario Fernández Martínez
Geoblink Tech blog
11 min readAug 25, 2020

--

Airflow ❤ Kubernetes

Data Pipelines at Geoblink

Here at Geoblink, we do love video games. We can’t deny it. We like them that much that we’re naming our code sprints after video games each time. Why does this matter? Well, maybe it doesn’t matter at all. Fair enough, we’re nerds. But we believe the ins and outs of our daily work are quite similar to adventures and decision making.

As data engineers, it is quite common to see ourselves as in an endless adventure. Sometimes we imagine ourselves being a nerdy Guybrush Threepwood seeking the secrets of Monkey Island. We’ve faced multiple times those “secrets” hidden behind the adventure of acquiring new valuable information from different kinds of sources to be included in our most beloved treasure: our databases. As you might already imagine Location Intelligence –that’s what we do– merges multiple datasets with different shapes, sizes, and timing into understandable data for our customers.

With the growth of the company, data acquisition has evolved quite a bit since the very beginning. At first, obtaining a few public sources was enough for the use cases back then. In the course of the years, new sources, different kinds of information, and different countries have constantly challenged us.

This post tries to show how we evolved from a basic one-for-all tool set-up to a more sophisticated approach, taking advantage of different tools for the specific use cases they were developed for.

A bit of history at Geoblink: Jenkins

As in most of the Startups, finding a tool that fits and solves a bunch of problems is key. Once you find something that works, keep it, and move onto the next problem. At Geoblink, life’s as tough as in other Startups: once we’ve found something that works for –almost– all of the problems, we keep it. In our case, that was Jenkins.

Jenkins is defined as a self-contained, open source automation server which can be used to automate all sorts of tasks related to building, testing, and delivering or deploying software. Although that’s true and we have been quite happy using it for data pipelines, we’ve realized that Jenkins isn’t the best suitable tool for that purpose.

Whilst we’ve been including new pipelines carefully and managed to have most of our problems covered with Jenkins, we ended up freezing the server multiple times due to overloading it, or found it to be a bit complex to start a process from a failing step. For example, in those cases, we had to manually delete some of the loaded data in previous steps and then relaunch the whole pipeline. Therefore, we understood that even when our use cases can be solved with Jenkins, there are other tools that might suit our data pipelines better. Meanwhile, Jenkins stays for CI/CD pipelines, a use case where it fits perfectly.

After deciding that Jenkins wasn’t the best option we had for data pipelines, we started a series of spikes –product-testing method originating from Extreme Programming that uses the simplest possible program to explore potential solutions– with different technologies trying to find a new good-enough tool for our use cases. We experimented with Jenkins X, Luigi, and Airflow, among other tools discarded by our team. In the end, we decided to go all-in with Airflow, since it was the tool that better fit our criteria.

Apache Airflow

According to its documentation, Airflow is a platform created by the community to programmatically author, schedule, and monitor workflows. It’s defined as a scalable, dynamic, extensible, elegant tool written in pure python with an intuitive user interface and plenty of integrations.

A thing our team fancied from the beginning was the number of integrations with external services: AWS, GCP, Postgres… that Airflow provides out-of-the-box. Almost every tool we used at the time at Geoblink was covered. In addition to that, it was written in Python, a language that almost every single member was able to code in.

Another facet they liked was the simplicity of the UI and how to deal with failing tasks. Although understanding the way Airflow handles the start date/execution date wasn’t trivial, most of the features (Operators, Hooks, …) were exactly what they were looking for.

Kubernetes

At Geoblink we use Kubernetes for deploying our services. It gives us the flexibility and freedom that we need in order to deploy services and scale them satisfactorily.

Kubernetes is a portable, extensible, open-source platform for managing containerized workloads and services, that facilitates both declarative configuration and automation. It has a large, rapidly growing ecosystem. Kubernetes services, support, and tools are widely available.

It’s already within our tech culture, broadly used by all our teams.

Towards Monkey Island: Airflow on Kubernetes

Let’s get back to our mission. Crossroads, dead-ends… Despite being cautious and taking baby steps since we left Mêlée Island –aka data pipelines on Jenkins– rather sooner than later the trip towards Monkey Island brought us different challenges. We developed big concerns about the way Airflow behaves when running on Kubernetes. It’s known that everything running on Kubernetes must be fault-tolerant, allowing us to keep working as if nothing happened when a Pod dies. And be sure, there will be cases where that happens.

The very first challenge popped up as soon as we decided to deploy Airflow on Kubernetes. Keep in mind that our first tests on Airflow were always executed on local machines running Docker containers. We had to decide without great knowledge further than reading the documentation, which Executor –mechanism by which Airflow task instances get to run– would suit us better.

Since most of our crew members were fluent in Python, and given that Airflow is written in Python too, we decided to take the LocalExecutor, the same they were using on the first tests in local.

LocalExecutor

The scheduler does its periodical work and decides what task must run, spawning them in local processes, each with its own PID. The LocalExecutor treats the running pod as a single instance where all things can be executed in parallel, with the restriction of being parallelized by the OS –as regular processes– with CPU and/or memory competition. As you might already have thought, this is far from optimal. Several processes fighting for a piece of the CPU loot: locks, semaphores…Well, we could live with that though. Nobody said it was easy.

There are several problems attached to having the LocalExecutor in place. The first one we found is that all our pipelines (Airflow DAGs) were constrained by the Airflow version and its dependencies, causing troubles among the crew when they were willing to upgrade/install dependencies. This is something we could handle for a while, even though we were aware that it would make improvements harder to achieve.

Another problem –and marked as a red flag– appeared while we were trying to keep the development flow active by having a CI/CD helping us to deploy new features as soon as our team decides the code is ready to be merged. Our running Airflow is removed from Kubernetes along with all its running processes when a new Airflow pod with the new features is ready. Even though our external Airflow database contains information about all running processes, all running tasks are killed. Therefore, all tasks end in a failed state.

Example of use

Here’s an example of the red flag problem. We are simulating a new deployment of Airflow in a particular moment while a task is running to determine if LocalExecutor matches our needs.

Note: we are using KubernetesPodOperator –in the following sections we’ll explain why–, therefore all tasks executed will create a new pod, which lets us see what happens in the cluster.

First of all, we need to execute a Dag and check the running pods:

As expected, Airflow runs in a dedicated Pod named svc-airflow-master-79f8c4d87d-tm8s5, whereas the database runs in a separated instance named svc-airflow-db-76c744b454-qs7ds. This split service lets us kill the master without messing the database. Finally, we have test1-06144c19, the actual task spawned by the KubernetesPodOperator.

Once the container test1-06144c19 is in status Running, we proceed to kill the Airflow master:

A few seconds later, Kubernetes respawns the master automatically:

During the time the master process isn’t ready, the UI access is down:

Service unavailable 😕

After the master pod is ready, the UI shows the correct information, since the DB knows there was a running container, with a given PID:

Task running 😌

Here’s part of the tasks information where the PID is shown:

Task information 🗒

But, unfortunately, after a while –depending on when the process using the same PID number but in the new Pod finishes– the UI shows a fail, even when the task was successful –it performed only a sleep command–.

DAG Execution –1– 👆

Reaching this point, we realized that our first raid turned out to be a dead-end road that works for some cases but increases the complexity of deploying changes automatically, e.g. blocking somehow deployments while there are running Dags/Tasks. Nay! Since we want to keep our crew happy and motivated, we need to find a better Executor that lets us deploy code without struggling with the above-mentioned problems.

KubernetesExecutor

Since version 1.10.0, Airflow introduces a new sort of Executor that allows us to run any single task in a Kubernetes Pod: KubernetesExecutor. Using it changes nothing in the way our crew works since it spawns the very same code they wrote for the LocalExecutor but in different Pods: Aye! Airflow connects to the running task –Pod– through the hostname created along with the Pod.

Nevertheless, it won’t fix the problem of being restricted to the dependencies set in the Airflow repository. How come? Well, as mentioned, Airflow spawns a new Pod per every single task. All tasks share the same docker image, determined by worker_container_repository setting from Airflow configuration. Therefore, all running Pods will share the same dependencies.

Now, we need to figure out whether the issue we had before –the red flag problem– it’s at least solved by using the KubernetesExecutor. We are going to execute the same simulation above, i.e. kill the master and check how it behaves while a task is running.

Example of use

When starting a Dag we could find the following:

As we can see, there are two new Pods created. One is the controller task itself –testk8podoptestk8s1-e792...– generated by Airflow KubernetesExecutor while the other is the actual task spawned by this task named test1-49c3bb06 –remember we are using KubernetesPodOperator–.

Once the Airflow master is deleted,

both tasks keep running while the Airflow master is being respawned. Just a while later, we can see Airflow master up and running again

with the expected DAG execution in place

DAG execution –2– 👆

A few seconds later, the task is marked as completed in our CLI tool, with the second task running.

DAG execution –3– 👆

The main difference with the LocalExecutor is that the task status is retrieved via the hostname above mentioned. After restarting the master, the hostname is still reachable, since Airflow keeps the controller Pod until it checks the status.

After the second task is done successfully, Airflow shows the DAG as a success

DAG success 👏

Yo Ho Ho, one step closer to Monkey Island! We’ve seen how the KubernetesExecutor solves our biggest problem. We are now able to deploy new features without fear and our crew toasts to the new Executor while seeking the next challenge: fixing the dependency problem.

KubernetesPodOperator

When presenting the usage examples above we mentioned that we use the KubernetesPodOperator. Besides, we still have an unsolved problem due to the lack of flexibility on dependencies. This has been a problem within the crew, almost turning into a mutiny when one of the members wanted to upgrade a library such as Pandas to a newer version.

The KubernetesPodOperator creates a new task –Pod– executing a docker image pulled from a repository. This kind of operator allows us to have different images for different tasks so that different tasks can use different versions of the same library. Thus, it also opens up the possibility of having multiple languages “running” on Airflow. We can then have multiple CI/CD flows –one per image/task– making us more flexible on the way we address issues, using the best libraries, or even programming language depending on the problem to be tackled.

Another benefit –but also a little disadvantage– is that the Dags become lighter, with less complexity and less code –extracted to the task code itself–, keeping Airflow as a data pipeline scheduler. The tradeoff is that the images created will need to handle all connections to external data sources within the image, not using the benefits of having Airflow.

Anyway, here’s where each member can now decide whether to use Airflow with its Operators, Hooks, and all functionality or use a different approach that is better for the problem to be solved. It gives us flexibility plus it solves the dependencies/language locking.

Conclusions

After a few months of trying to set up the best approach for our team for building data pipelines, we’ve decided to use Airflow on Kubernetes, using KubernetesExecutor and fostering the use of KubernetesPodOperator when something special out of Airflow and/or its dependencies described in the repository must be done. With this set-up, we embrace all kinds of technologies/languages thanks to Docker images, while also making our team’s life smoother and unlocking its potential to build new and better features.

--

--