Why is Airflow an excellent fit for Rapido?

ChethanUK
Rapido Labs
Published in
8 min readSep 3, 2020

Back in 2019, when we were building our data platform, we started building the data platform with Hadoop 2.8 and Apache Hive, managing our own HDFS. The need for managing workflows whether it’s data pipelines, i.e. ETL’s, machine learning predictive, and general generic pipelines was a core requirement where each task is different.

Some just schedule a task on GCP like creating a Cloud Run an instance or Dataproc Spark cluster at a scheduled time and some tasks are different because they can only be scheduled based on data availability or Hive partition availability.

To sum up, the requirements at that time in the data platform which required a scheduling system were like:

  • ETL pipelines
  • Machine learning workflows
  • Maintenance: Database backups
  • API Calls: Example can be Kafka Connect connectors management
  • Run naive cron based jobs where the task is to spin up some infra in a public cloud, for example, to spin up a new cluster or scale-up existing Dataproc cluster, stop the instances or scale the Cloud Run, etc..
  • Deployments: Git -> Code deployments

ETL pipelines consist of a complex network of dependencies which are not just data-dependent, and these dependencies can vary based on the use cases metrics, creating canonical forms of datasets, model training

To create immutable datasets (no update, upsert, or delete) we started with a stack of Apache Hive, Apache Hadoop (HDFS), Apache Druid, Apache Kafka, and Apache Spark with the following requirements or goals:

  • Creating reproducible pipelines, i.e. Pipelines output need to be deterministic like with Functional programming if there is retry or we retrigger the tasks the outcome should be same, i.e. Pipelines and tasks need to be idempotent
  • Backfilling is a must since data can evolve.
  • Robust — Easy changes to the configuration
  • Versioning of configuration, data, and tasks, i.e. easily add or remove new tasks over time or update the existing dags code
  • Transparency with data flow: we discussed that something similar to Jaeger Tracing for data platform would be tremendous and checked at possible options like Atlas and Falcon

Journey with Cloud Scheduler(Managed cron)

We started using Cloud Scheduler with shell scripts, and python scripts since it was fully managed by google cloud, and setting up cron jobs is just a few clicks. Cloud Scheduler is a fully managed cron job scheduler.

The main reason to go with Cloud Scheduler was unlike the self-managed cron instance, there is no single point of failure, and it’s designed to provide “at least once” delivery on jobs from cron tasks to automating resource creation like we used to run jobs which were creating Virtual Machine’s, Cloud Run, etc.

Cloud Scheduler Web UI
GCP Cloud Scheduler Create Job via UI
Cloud Scheduler Create Job via UI

Cloud scheduler or cron doesn’t offer dependency management, so we have to “hope dependent pipelines finished in the correct order”. Had to scratch from the start for each pipeline or task (starting from blank for each pipeline won’t scale), though cloud scheduler has timezone supported we faced few timezone problems in druid ingestions and subsequent dependent tasks since the jobs were submitted manually via UI brings it can introduce human errors in pipelines, Cloud scheduler can also retry in case of failure to reduce manual toil and intervention, but there is no task dependency and managing complex workflows or backfilling was not available out of the box. So in few weeks, we decided that this may not be suitable for running data and ML pipelines since these involve lof of backfilling requirements, also cross DAG dependencies, and also may require data sharing between tasks.

Then, we started looking into the popular open-source workflow management platforms which can handle 100’s of task with failures and callback strategies and tried to code a few tasks, and deploy them in GCP to complete the POC. Projects which were considered were Apache Airflow, Apache Oozie, Luigi, Argo, and Azkaban.

Both Apache Oozie and Azkaban were top projects at that time with the stable release. Oozie is a reliable workflow scheduler system to manage Apache Hadoop jobs. Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop, and Distcp) as well as system-specific tasks (such as Java programs and shell scripts).

Apache Oozie Job Browser

Still, with Oozie, we had to deal with XML definitions or had to zip a directory which contains the task-related dependencies, development workflow wasn’t as convincing as Airflow. Instead of managing multiple directories of XML configs and worrying about the dependencies between directories, the option to write python code can be tested, and since it’s a code all the software best practices can be applied to it.

Azkaban Flow

Azkaban has distributed multiple-executor mode and beautiful UI visualizations, option to retry failed jobs, good alerting options are available, can track user actions, i.e. auditing was available, but since the workflows are defined using property files finally, we didn’t consider this option.

Luigi was promising since the deployment in Kubernetes was so simple, and it handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.

Luigi Task Status

But re-running old tasks or pipelines was not clear, i.e. No option to retrigger the tasks. Since it uses the cron feature to schedule we have to wait for Luigi scheduler to schedule it again after updating the code where Airflow has its own scheduler hence we can retrigger the dag using Airflow CLI or UI.

Luigi being cron scheduler scaling seem difficult whereas in Airflow the Kubernetes executor was very promising. Creating a task was not as simple as Airflow, also maintaining the dag is a bit difficult as there were no resources for dag versioning.

Comparison Between Luigi, Airflow & Oozie on the basis of Features

Finally, it was down to Airflow and Argo:

  • Both are designed for batch workflows involving the directed Acyclic Graph (DAG) of tasks.
  • Both provide flow control for error handling and conditional logic based on the output of upstream steps.
  • Both have a great community and actively maintained by a community of contributors.

Why choose Airflow over Argo?

But few main points at the time of decision were Airflow is tightly coupled to the Python ecosystem, and it’s all about dag code. At the same time, Argo provides flexibility to schedule steps which is very useful as anything which can run in the container may be used with Argo. Still, the problem is a longer development time since we will have to prepare each task’s docker container and push to Google Container Registry, which is our private Docker repository via CI/CD.

Argo Workflow UI

Argo natively schedules steps to run in a Kubernetes cluster, potentially across several hosts or nodes. Airflow also has K8 Pod Operator and Kubernetes Executor which sounded exciting since it will create a new pod for every task instance and no need worry about scaling celery pods

Airflow and Argo CLI are equally good, Airflow DAGs are expressed in a Python-based DSL, while Argo DAGs are expressed in a K8s YAML syntax with docker containers packing all the task code.

Airflow DAG — ETL job which runs Spark job and updates the Hive table

Airflow has a colossal adoption; hence there is a massive list of “Operators” and “Hooks” with support for other runtimes like Bash, Spark, Hive, Druid, Pinot, etc. Hence, Airflow was the clear winner.

To sum up, Airflow provides:

  1. Reliability: Airflow provides retries, i.e. can handle task failures by retrying it, that is if upstream dependencies succeed, then downstream tasks can retry if things fail.
  2. Alerting: Airflow can report if dag failed or if dag didn’t meet an SLA and inform on any failure.
  3. Priority-based queue management which ensures the most critical tasks are completed first
  4. Resource pools can be used to limit the execution of parallelism on arbitrary sets of tasks.
  5. Centralized configuration
  6. Centralized metrics of tasks
  7. Centralized Performace Views: With views like Gantt we can look at the actual performance of the dag and check if this specific dag has spent five or ten minutes waiting for some data to land in then once data arrived it trigger the spark job which might do some aggregation on that data. So these Views help us to analyze the performance over time.

Future of Airflow:

Since we already have 100s of dags running in production and with Fluidity(inhouse airflow dag generator) we expect the number of dags to grow by twice or thrice in the next few months itself, one of the most-watched features from Airflow 2.0 is the separation of the DAG parsing from DAG scheduling which can reduce the amount of time(time where no tasks are running) wasted in waiting and reduce the task time via fast follow of airflow tasks from workers.

Improving the DAG versioning to avoid manual creation of versioned dag [i.e. to add new task we go from DAG_SCALE_UP to DAG_SCALE_UP_V1]

High availability of scheduler for performance scalability and resiliency reasons is most with Active-Active models.

The development speed of Airflow is generally slow, and it involves a steep learning curve, so there is Fluidity(full blog coming soon), and the same dev replica exactly like the production environment using Docker and Minikube is spawned.

Work on data evaluation, reports and data lineage with Airflow

If you enjoyed this blog post, check out what we’ve posted so far over here, and keep an eye out on the same space for some really cool upcoming blogs in the near future. If you have any questions about the problems we face as Data Platform at Rapido or about anything else, please reach out to chethan@rapido.bike, looking forward to answering any questions!

*Special Thanks to the Rapido Data Team for making this blog possible.

--

--

ChethanUK
Rapido Labs

Big Data Engineer @rapidobikeapp Linkedin: @chethanuk