Migration to Airflow: One year feedback
At Maisons Du Monde, we used to create and schedule our data pipelines with Rundeck, which is an automation server like Linux Cron for the veterans. Rundeck had been available on our company’s network, so we simply used it to get the pipelines started.
At the beginning, this solution was convenient as long as workflows finished in time. But “winter is coming !” The platform kept on growing, the pipelines became increasingly bigger and much more complex. So Rundeck’s maintenance has become much harder, because of an unconvenient user experience and some performance limits on the local server.
That’s why we moved to Airflow !
In this article, we will express our feedback after one year of Airflow usage, comparatively to Rundeck, in the data team of Maisons Du Monde.
In the first section of this article, we will discuss about the main issues that we have encountered with Rundeck.
In the second section, we will explain how did we integrated Airflow to satisfy our needs. The best solution for us, is the one which is sustainable and user friendly !
Why we left Rundeck ?
In this automation system, our tasks were organized over tree folders, similarly to files over an OS system. Moreover, the scheduler executes jobs inside folders sequentially, following a depth-first search execution priority. So, there is no concrete dependencies, only some execution rules in the task scope.
However, separate folders could be run in parallel, but without dependencies. If needed, we shifted adequately the start time for the dependent workflows, with a safety margin.
Fernando here doesn’t know if Michael has reached the nursery. He’s just expecting it, and puts faith on the plan to have a chance escaping from the prison. It is 3 am now, Fernando shall get started. Hopefully Michael didn’t get caught.
So this is making the user experience harder for exploring or troubleshooting the workflows. Thus, catching up the failing jobs tends to be a nightmare when it comes to replay the time dependent workflows.
But there were many other reasons to quit Rundeck, here are some examples:
- Time loss due to safety margins and sequential execution inside folders
- Frequent errors due to the overwhelming Rundeck’s server
- Troubleshooting and incident recovery is too manual
- Difficulty to add new pipelines and dependencies
Airflow from our perspective
Apache Airflow is a popular open-source solution with an active community around it. It came up with a new spirit of automation suited for data use cases, also it could be integrated on a scalable architecture.
The solution could resolve all the previously mentioned issues and Airflow’s documentation first lines bring some answers to that. To sum up, here is some good reasons that led us to choose Airflow: it uses DAGs, has a rich user interface for monitoring and troubleshooting, could be scalable, workflows are Python coded and last but not the least, Google Cloud Platform — which is our cloud provider — has a managed service for Airflow named Composer. It helped us a lot to quickly study and deploy Airflow on our infrastructure.
Directed Acyclic Graphs (DAG)
Moving from folders to DAGs was a relief, simply because a DAG is less restrictive: it unlocks the possibility to make new dependencies and to multi-task.
We took this advantage to rethink the existent pipelines and dependencies. As a result:
- Workflows fit accurately to the reality
- Troubleshooting doesn’t require a background knowledge about tasks dependencies
- Time saving thanks to multi-tasking and accurate dependencies
Airflow has a rich user interface to handle troubleshoot and to manually catch up tasks over time and over dependencies.
Extensible Python framework
This is a very attractive aspect of Airflow. Each DAG in Airflow has an equivalent python code which has three steps:
- Instantiate a DAG
- Instantiate tasks: operators and sensors
- Make dependencies between tasks
And here is the best part, with Airflow plugins you could create your own operators or sensors by overriding the classes BaseOperator or BaseSensor.
Before discussing about Airflow integration, I would like to share with you some useful operators and sensors: LatestOnlyOperator, BranchOperator, ExternalTaskSensor, KubernetesPodOperator and also the DummyOperator that could be useful even if it is a dummy.
We use Google Cloud Composer as a managed Airflow’s service.
Google Cloud Composer instance has a specific bucket dedicated to put Airflow’s code in production. So the deployment is operated by Gitlab using CI/CD pipelines in two steps:
- Testing step: to detect the syntactical issues in the code to be deployed
- Deployment step: to put in production using a dedicated bucket in Google Cloud Storage, which is accessible from Airflow workers thanks to Cloud Storage FUSE.
Once CI/CD pipelines are achieved, it takes only few seconds to notice the changes in Airflow’s web-server.
An Airflow docker image was developed by the team for this purpose.
A volume is mounted, to make the container access to the Airflow’s git local repository.
Airflow’s configuration inside the container is similar to the production configuration. Thus, if the workflow works locally, technically, it should be working fine on production as well.
Furthermore, we have developed a “dry run” feature for a testing purpose, that could be set to True or False through the UI. With an active “dry run” mode, the execution would be simulated by just logging the steps instead of launching their execution.
It is possible to mount a volume on the context folder (~/. kube/config) with the running container in order to test the pipelines in a Kubernetes environment. This could be useful to test the new developed plugins before their releases.
We are using KubernetesPodOperator because most of our python projects are containerized and could be run in Composer’s GKE cluster. KubernetesPodOperator’s input parameters are quiet similar to a Kubernetes job’s Yaml file. Here is an example of a program which runs a transformation in the datalake.
So now you will not only require some Airflow skills, but also some additional Kubernetes skills for any job deployment. That’s why it was important to create custom plugins to apply some minimalism to the operators.
Do we need to change all the Kubernetes parameters anytime we would like to reuse a specific program like a transform operation ? Luckily not!
Actually, we only have three changing parameters for a transform operation which corresponds to its Docker entrypoint:
- dataset_id: equivalent of schema in BigQuery
- table_id: table name in BigQuery
- reference_date: which is the contextual date by default
So an Airflow plugin at Maisons Du Monde will expose only the changing parameters as inputs, and will preset the environment configurations. For example, the preset would contain a hard-coded Docker image URI and a predefined version value, “Latest” by default. Moreover, we used plugins to automatically define task_id by combining the required inputs, to generate a task_id which looks like: “OPERATOR_NAME-my_dataset.my_table” .
Let’s take a look under the hood !
For the record, the purpose of our plugins is to have the fewest required inputs in order to use our custom operators. So, every Docker image we use has a specific plugin, which is basically a tiny class extending the abstract class MdmKubeOperator, this one is a child class of the original KubernetesPodOperator. The middle class (MdmKubeOperator) is very important to define some common methods and attributes that could be reused over all the plugins.
Here is a class diagram representing, among others, two of our plugins: QueryLauncherOperator for transform operations and DatabaseCollectOperator for databases collect operations.
Now that we know how Airflow could fit on our infrastructure, it remains to normalize the separation between DAGs in order to define some common task regrouping rules. In simple words, what defines a DAG ?
We ended up choosing the following organization:
- Collect DAGs (level 1): the rule is one DAG per data source. Each DAG contains only collect tasks and cleaning transformations without external dependencies.
- Use case DAG (level 2): this kind of DAGs merges workflows coming from different sources, in order to construct a topic workflow. Hence, it is often at this level that we calculate our most clean and reusable tables. For example: Users, Transactions, Traffic, Emailing…
- Output DAG (level 3): for workflows which prepare data to be exposed outside the datalake. Output tasks could refresh an API, reload a Dashboard, train a model or transfer data.
As Airflow doesn’t allow regrouping DAGs inside folders, we prefix every DAG by its level in order to see the collects first in the UI, then the central tables, then the output DAGs.
We didn’t end up with this solution at the first step of the integration. At the beginning, we had migrated only few workflows from Rundeck, in order to test & learn. So we went for a “DAG per use case” separation for a couple of months but we quickly found out that it wasn’t really sustainable. For example, a collected table that is used in two use cases DAGs could be problematic to place, hence the necessity to define what is a DAG in your company ! However this transition period was very crucial to raise our knowledge about Airflow operators and tips.
DAGs dependencies could be implemented using the ExternalTaskSensor. It periodically checks for another task’s execution state and turns on to green when it succeeds. This wasn’t possible with Rundeck, remember ?
Fernando has a cell phone now to check. He could precisely start right after the first job is done by Scofield.
However, we don’t recommend using sensors to directly listen on the tasks one by one, because the dependencies will get complicated, like too many octopus hugging each others.
Instead we do recommend to put some dummy tasks as checkpoints, to create tasks clusters inside each DAG. As a result, we listen on clusters instead of elementary tasks. In our case, checkpoints are based on execution duration.
So we have agreed on 3 duration levels inside each DAG:
- level 1: “quick” dependencies
- level 2: “average” dependencies
- level 3: “slow” dependencies
In the second DAG, the tasks A3, A4 and A5 depend on the first DAG’s level 1 tasks, which refer to the quick dependencies ! So this separation allows to execute urgent workflows first.
After one year of Airflow usage
More than one year after Airflow’s integration, only time could prove the final solution’s sustainability through real scenarios.
Indeed, it was a success because the current system satisfied our initial requirements:
- It resolved the basic issues encountered with Rundeck.
- Workflows succeeded most of the time, and the most urgent workflows could finish before 8 am.
- Troubleshooting became practical and catching up dependencies, now, requires only few clicks
- Scheduling jobs on production (Kubernetes cluster), now, requires only some minimum Airflow skills, but no Kubernetes skills
Integration success key is based on good decisions and scenarios anticipation which takes into consideration the infrastructure constraints. This required the entire team’s engagement to avoid forgetting any scenario.
So after more than one year of Airflow’s integration, the system was exposed to real situations which have revealed some strengths and some weaknesses as well.
New system strengths
Let’s take a look to a real scenario that witnessed the following key strength points: A trigger campaign sending a welcome email to the new Maisons Du Monde’s clients, scheduled on Airflow.
Scheduling easiness: Every member of the team is able to know how and where to schedule just by applying the rules. For the trigger campaign example, the task should be set in an output DAG which depends on two DAGs.
- 2_Transactions: to check what did the client bought
- 2_Users: to target only the contactable clients
The interesting thing is that selecting dependencies remains abstract and keeps workflows simple.
Troubleshooting convenience: It is handy to identify and manage incidents for workflows execution. Let’s assume that our email campaign has failed, we know that it depends on 2_Transactions and 2_Users. So we are able to find and fix the root cause error and its dependencies easily through the user interface. However, most incidents were related to network problems or data sources themselves, hence the interest of having separate DAGs per collected source.
Responsibilities separation: every team member could enrich the production pipelines by adding new tasks to the workflow that runs on Kubernetes. However, Kubernetes skills are not required in the pipelines making. The interesting point, is that responsibilities could be divided between two types of users:
- Plugins creators: they focus on Airflow’s and Kubernetes preset parameters, like variable environment, secrets, volume … to create a custom operator that only requires the functional parameters.
- DAG managers: for workflows monitoring and DAGs creation using the existing operators and plugins.
Time gain: the pipelines were optimized thanks to the fitting workflows and sensors which have reduced dependencies waiting time. As a result, this trigger campaign is most of time ready at 8 am instead of 5pm before: 9 hours optimization.
The system weaknesses
It is True that Airflow made our scheduling routines easier since we have began using it. Nevertheless, some weaknesses should be highlighted.
The scheduler: As you might now, the scheduler manages DAG-runs and orchestrates tasks executions across the workers and pods. So for every heartbeat, the scheduler loops over all the DAGs tasks and query the Airflow’s database, to orchestrate all the executions. But when the scheduler is down, Airflow is out of service meanwhile Composer will be trying to pop up a new scheduler to recover. So there is two recommendations to minimize scheduler’s incidents:
- Reducing the scheduler’s heartbeat
- Avoid executing codes inside DAG python files, because it would be executed on the scheduler pod instead of the workers.
Zombies tasks: Some tasks keep in a running state on Airflow even if they have been completed. That happens likely for three reasons, when Airflow database is overwhelmed, when an Airflow worker is down or when we use a sub-dag (that we strongly advise you not to use). The only way we found so far to recover is to re-run the task manually.
Sub-DAGs: We used to encapsulate the reusable workflows inside sub-DAGs. It simplifies more the pipelines but it have caused so many bugs that we have abandoned using this feature.
DAG folders: DAGs are listed on a paginated list, and it is not possible to organize them on folders for a better navigation on the UI.
Timezone handling: We have encountered some issues to handle timezone, like the time inconsistencies between the UI and the execution context, or the summer time switch. Even though it is configurable in “airflow.cfg”, we managed to programmatically adjust the context’s timezone to Europe/Paris inside plugins, in the parent class MdmKubeOperator, because it doesn’t work fine in our current version 1.10.0 of Airflow. Fortunately, this problem was tackled by the community since the version 1.10.10 of Airflow.
Trigger task with input parameters: It was the missing features that we had with Rundeck before. So a new Airflow page was developed internally, to launch a task with custom parameters to fill in the form. To know more details about this internal feature, check this Maisons Du Monde’s article to do the same. https://medium.com/maisonsdumonde/road-to-add-form-for-airflows-dag-1dcf2e7583ef
Beyond the features which are currently developed in this project, we have a to-do list of new features to improve the user experience a little more. However, we stay alert to Airflow’s Roadmap and the new versions changelogs for a better prioritization.
Sensor task’s redirection
We want to add a new button in the ExternalTaskSensor’s window, which will redirect to the corresponding DAG.
DAG dependencies overview
We want to automate the creation of a top level DAG named “0_ALL_DAGS”, which displays all the relationships between DAGs. Each task of “0_ALL_DAGS” is actually the same sensors which have been used for DAGs dependencies. So dependencies between DAGs would become concretely represented and we will avoid the pitfall of creating cycles between DAGs.
In the end
Airflow is a really good tool that could fit, basically, on every architecture. However, there is no common integration standards to integrate your solution. Furthermore, we noticed that companies have a different integration logic, but a lot of them have encountered the same pitfalls. I hope that this feedback will inspire you. Your solution would be definitely different but remember: the better solution you have, is the one which is “sustainable” and “user friendly”.
I would like to thank you for your reading, a special thanks to the entire data team that made this success story happen, also to Benjamin Berriot and Guillaume Duflot for this article review.
We are looking forward for your comments, hoping that this feedback was useful and feel free to reach us for further discussions and experience sharing.