Complex tasks orchestration at Hurb with Apache Airflow

Vinícius Mello
hurb.engineering
Published in
9 min readAug 30, 2021

How Apache Airflow is helping Hurb’s Data Engineering team to create and orchestrate workflows in an elegant way

This article will describe how we are leveraging Apache Airflow as our Task Orchestration Platform. If you haven’t seen our first post describing our Data Platform, check it out here.

Hurb is a fast-growing Brazilian tech company with a mission to optimize travel through technology. We’re focused on delivering the best experience for our customers and allowing them to explore the world at the best price, of course! The data team has a role in this mission: we’re constantly analyzing our data, joining them, creating dashboards, developing machine learning models to optimize processes or make recommendations, and many more. In one of our previous posts, there is an application based on clustering hotels to manage investments on metasearch engines better.

We rely on several workflows that extract data from dozens of data sources and process, combine, enrich, and deliver this data at our Analytics Platform.

“Without data, you are blind and deaf and in the middle of a freeway” — Geoffrey Moore.

Managing all of these workflows has its complexity. For example, when people who process data start to automate their processes, they inevitably write batch jobs. These jobs need to run as planned and usually have dependencies on other existing data assets and other jobs. The figure below depicts one example workflow for it.

An example workflow that processes data

As the volume and complexity of data processing pipelines grow, you can simplify the process by dividing it into smaller jobs and coordinating their execution as part of a workflow. One workflow can be represented as a simple Pipeline or a complex directed acyclic graph (DAG).

Comparison between Pipeline and DAG.

There are many ways in which Pipelines and DAGs could be implemented. For example, one could implement this with all steps into a single Python script and a cron job working as the scheduler in tools like Rundeck. The problem with this approach is that when some step needs to be rerun, there is no easy way to do it without running the entire flow. Besides, some questions arise: Is this solution scalable? What about the management of dependencies?

Put several tasks together, even for a short time, and soon you will have an increasingly complex computational job graph. When the size of your team grows, things can be problematic too. How to manage all cron jobs easily?

Due to this, data engineers must take some concerns into account:

  • Scalability in managing: How do you manually manage scripts & Cron expressions for hundreds of workflows?
  • Scalability in execution: You may want to run your jobs on multiple worker nodes for performance consideration. How do you manage them?
  • Environment Dependencies: Different jobs may have different dependencies, e.g., Spark, network proxy, resources, etc.
  • Connections to different systems: How to manage credentials to RDBMS, GCP, Hive, HDFS, etc.? They all come together with configurations like host address, port, id, password, schema, etc. So how to manage them in a centralized fashion?
  • Observability: How do we monitor the status of each step? Which batch job failed? Due to which stage? For what reason?
  • Re-running: How can we re-run a specific step? Manually do it or make ad-hoc changes to the script? Neither is ideal.
  • Code Duplication: When the number of workflows grows, so the similarity between some of them. Shouldn’t we minimize the number code duplication following the Don’t Repeat Yourself (DRY) principle?

Data Engineers can manage workflows through several tools. However, Apache Airflow, due to its rapidly growing community and the one that best meets our requirements, was our choice. As introduced in our first post describing our Data Platform Architecture, we believe in the open-source idea because we absorb entropy from many different companies to make the system more robust. Beyond that, Apache Airflow is very aligned with our beliefs, and it allows us to move fast while creating, monitoring, and transforming data pipelines. It first started as an internal project at Airbnb, then it had a natural need in the community which was a major reason for becoming open-source. The workflows are defined as DAGs and configured as Python scripts, allowing for a dynamic generation: One code, several workflows! (Don’t repeat yourself!)

Apache Airflow is an open-source platform developed for workflow orchestration.

The steps are called Tasks within the DAG and can be defined with Operators. These are reusable components that execute a predefined script. The community has developed thousands of operators that make a Data Engineer’s life easier, and it’s also possible to define your own. On Airflow Docs, there is a list of Providers Packages that data engineers could use to reduce duplicated code.

At Airflow Provider’s Packages groups Operators, Sensors, Hooks that reduces code duplication — https://airflow.apache.org/docs/.

In this way, we can make a comparison between DAGs and LEGOs, where we only need to choose which type of block we are going to use in order to build something. Below there is an example of different Operators being used together.

A DAG can be constructed using different Operators developed by the community or custom operators developed in-house.

Airflow has the concept of Task Lifecycle that helps understand which part of the DAG failed. It’s possible to execute a Task by only changing its status.

Airflow UI has a Tree View on which all previous DAG Runs can be seen with its tasks status.

One thing to note is that Airflow is not dedicated only to Data Workflows. Different teams could use it to orchestrate almost anything that a script could do. Our use cases consist of ETL Jobs and Reverse ETL (when we extract data from our Data Warehouse to external tools such as Google Ads, Salesforce, and others), Machine Learning model training, scheduled e-mail reporting, data quality checks, and many more — One tool, different use cases.

Executors

Airflow has several executors, which are the responsible components for executing a scheduled task. We chose KubernetesExecutor, an executor that for each Task creates a Kubernetes Pods leveraging our Kubernetes Cluster Resources. Moreover, this kind of executor allows us to run tasks changing their configuration specs based on attributes. For workflows that need more resources, we customize it on executor_config.

Airflow Scheduler makes requests to Kubernetes API in order to launch new pods that will run tasks.

In our cluster, we have different Node Pools for each use case. We leverage the Pod Mutation feature to set which configuration the pods will request based on their attributes. This mutation is also useful to avoid workflows that consume high resources to be allocated in the same nodes especially the Machine Learning workflows that consume high CPU or needs a GPU instance.

DAGs Generators

Airflow allows DAGs dynamically creation within a Python script. We can leverage it by defining a DAG generator that receives a configuration file and creates multiple DAGs. In order to define DAG Generators, we create a Python script with a function that receives a configuration a returns a DAG object that implements the workflow:

Then, we read all configuration files for local storage, and for each of them call this function:

Then, we read all configuration files for local storage, and for each of them call this function:

These configurations could be fetched from an external Database, files on Google Cloud Storage, or any other place.

We have been using it a lot since we have identified that there are many similar workflows. So, for example, one of our Reverse ETL workflows is to upload files generated by some queries ran on BigQuery. So instead of writing specific code for each report that needs to be sent, we created a DAG Generator that creates DAGs for each upload. Besides, by doing it, we create unit tests for these DAGs allowing us to reduce bugs introduced.

One of our DAG Generators, “salesforce_sftp” that without code rewriting or duplication can be used to upload files to Salesforce Marketing Cloud.

Another DAG Generator that was developed triggers Dataflow Batch Pipelines based on Google Pub/Sub messages. We created this one since some data pipelines could be changed from streaming to batch mode since the volume of messages received upstream was not constant.

DAGs created dynamically to executes Apache Beam Pipelines on Google Dataflow.

Since we started to use this concept of DAG Generator, we have increased our development throughput. Besides, this has also been useful for our Business Analysts and Machine Learning Engineers. They can easily leverage some of the common workflows to deploy their DAGs very fast without silos. This pattern is also helpful to reduce the amount of duplicated code and avoid common workflows implemented in different ways.

One use case that our Business Analysts are constantly using is the Table Jobs orchestration. Table Jobs are workflows on which a table is created, or data is appended to an existing one as a result of query execution, but this execution must be done after some condition. A typical example is when Google Ads updates the Sessions Table on BigQuery. After it, we need to process the data using a table job. However, no code is necessary to declare a new one; one only needs to fulfill a configuration file specifying execution conditions, like schedule interval or if a sensor is used. There are also Table Jobs on which the query is created dynamically based on execution date so that we can perform backfilling operations.

Observability

As explained above, one concern of data pipelines management is to understand the health of a pipeline. Data Engineers must understand how their DAGs are performing if there are any failed tasks, or how much time a task lasts. One of the visualizations provided in the UI is the Gantt, which allows us to understand which tasks are taking more time to complete.

Airflow Gantt Chart that shows how long each task took to complete.

Another feature recently introduced into Airflow is the Dependency Graph that allows us to know the dependencies between our DAGs.

The dependency Tree allows us to understand the dependencies among different DAGs.

In spite of the tools provided at the UI, we understand that to better monitor our DAGs more monitoring is necessary. We rely on Prometheus + Statsd to collect metrics of each DAG execution, then we analyze these metrics at our Grafana Instance.

On our dashboards, we combine information from different sources such as Prometheus, Airflow’s Database, and Google Stackdriver, so that we can better understand when a DAG is broken.

Grafana is our main Monitoring Tool.

Conclusions and takeaways

In this article, we took a look at how Hurb.com has been using Apache Airflow to orchestrate complex tasks within its Data Platform. It shows how we leverage Airflow’s ability to define DAGs dynamically by creating DAG generators, which has improved our time to deploy workflows similar to others already implemented and reduced our duplicated code. Besides, it also shows how we have deployed Airflow on our Kubernetes Cluster, a battle-proven stack by many companies. We started our Airflow instance very simple, without many of the things related here and through the past two years, we have been improving it so we could go even further.

Join the crew if you like what we are doing at Hurb! We are continually seeking sharp analytical people for our Data & Analytics Tribe. We have offices in Rio de Janeiro, Porto, and soon in Montreal.

--

--

Vinícius Mello
hurb.engineering

Head of Engineering & AI @ hurb.com | Passionate about technology, leadership, and martial arts