Orchestration of AWS EMR Clusters Using Airflow —The Insider Way

Omer Bayram
Insider Engineering
4 min readFeb 21, 2022

Insider’s in-house Machine Learning Platform Delphi is a very busy piece of software. We have about 250 interdependent Spark jobs processing trillions of clickstream events and they train about 2000 ML models on a weekly basis.

All of these jobs must be completed within a week, and they all run on different schedules, while being dependent on each other. The dependency map of our ML jobs looks like the subway map of Tokyo.

Subway Map of Tokyo

When the dependency map looks like this, a workflow orchestration tool was needed, and we picked Airflow. It’s the de facto standard workflow orchestration tool for data engineering pipelines. After a few easy tutorials, we thought that “This is easy. We can move all of our architecture within a quarter”. We didn’t know we were about to go into a dungeon with a raging dragon waiting for us.

This will be a series of blog posts about how we came to terms with that raging dragon. It wasn’t that bad after all. You just need to understand what it wants.

Overview of our DAG Architecture

Building blocks of our workflow orchestration are DAGs. We grouped our EMR jobs that need to be run sequentially (like Labeling -> Dataset Preparation -> Training -> Evaluation) into separate DAGs. Each EMR job is represented by a TaskGroup in Airflow. Below is a screenshot of a simple DAG from our production Airflow.

An actual DAG view from our production Airflow

There are 4 lanes of Task Groups here, because we have divided our customers to 4 groups for this particular DAG.

Each box here (like labeling_1) represents a TaskGroup, which consists of two tasks:

  1. start_cluster launches an EMR cluster using a PythonOperator. It’s basically a python function which configures the EMR clusters together with the cluster steps defined as Spark Jobs that need to be executed. The cluster is finally created using boto3’s run_job_flow method.
  2. wait_cluster_termination monitors the cluster using our EmrClusterStateSensor, which is our customized version of EmrJobFlowSensor. It checks the status of the cluster using boto3’s describe_cluster method. We configure the clusters to auto-terminate when all the steps are done. However, we want to know whether the cluster successfully finished its steps, or failed, so that Airflow can decide to continue with the execution of the next EMR jobs, or retry the failed EMR job.

If the cluster terminates with failure, for example the spot instances are interrupted, then we retry the cluster with a delay. We use the retry_callback argument of the EmrJobFlowSensor, and we use the clear_task_instances method of Airflow, to clear the start_cluster task, so that start_cluster task is retried as well. Retries will be covered in more detail in another post together with code snippets.

Waiting Clusters with Instance Failure

We have also encountered the case where all the spot instances for Master or Core Instance Fleets are gone, and the cluster waits in an idle state with WAITING status, together with the Instance Failure message. We wanted to consider this case “failed” as well, so that those clusters are retried as well. Airflow’s original EmrJobFlowSensor considers these clusters as still running.

Screenshot from our production AWS Console for EMR showing Waiting clusters with Instance failure

We have added a new functionality to our EmrClusterStateSensor to check the instance counts of Master and Core Instance Fleets. We have defined two new custom states, ZERO_MASTER and ZERO_CORE, which are assigned when the sensor detects that the respective Instance Fleet does not have any running instance using list_instance_fleets method. When one of these two statuses are assigned, the sensor terminates the cluster and raises an Exception to Airflow, so that retry callback is executed.

Workaround for EMR Endpoint Service Quotas

We have more than 20 clusters at any given moment. All of these clusters have sensors in Airflow that monitors them. We occasionally reach the service quota limit of EMR endpoints (e.g. describe_cluster), when all of these sensors make a request to EMR at the same time. You can see the full list of service quotas from this AWS EMR doc. In order to go around this limit, we have configured a lambda function, which is triggered by EMR Cluster State Change Event from Cloudwatch Events. The state change event carries the information about the new state of the cluster, so there is no need to call describe_cluster, therefore the lambda does not count towards EMR endpoint quota.

The lambda function writes the states of all clusters to a dedicated table in DynamoDB. We have configured our EmrClusterStateSensor to check this table in DynamoDB as well. In fact, we have found that our table was so reliable, we configured the sensor to check the cluster state using DynamoDB with 90% probability, and using EMR endpoint with 10% probability.

This was very broad general view of how we use Airflow to orchestrate our Spark Jobs running on AWS EMR service. Follow the Insider Engineering Blog to get the notification for the more detailed posts explaining how we customized EmrJobFlowSensor with code snippets, and how we use retry_callback to retry failed clusters.

--

--