Reducing AWS EMR data processing costs
Cloud Infrastructure and Managed Services are a game-changer. These give us flexibility and autonomy to focus on the business. By leveraging them we can reduce time to market, go fast in implementing new features and even change orientation or drop components without having to carry a heavy legacy.
However, as we grow, our cloud costs can explode without being noticed. It’s important for infrastructure and development teams to regularly monitor the cost of their services and analyze cost patterns. Many good practices do not require big efforts but can lead to substantial savings.
In this article, we describe 3 measures that helped us to significantly lower our data processing costs on EMR, the managed Hadoop service from AWS:
- Run our workloads on Spot instances.
- Leverage the EMR pricing of bigger EC2 instances.
- Automatically detect idle clusters (and terminate them ASAP).
We mainly use EMR to launch Apache Spark jobs and it brings a lot of useful features:
- The ability to start/stop Hadoop clusters programmatically (using the AWS SDK API).
- Managed cluster maintenance (automatic replacement of bad nodes, etc.).
- Its flexibility, for upgrading Hadoop versions, switching instance types, resizing clusters, etc.
- Monitoring capabilities using CloudWatch (Cluster, nodes, IO).
It is worth noting that EMR provides a packaged Spark. However, we prefer to have control over the Spark and EMR version independently and use the official open-source Spark package over the provided one. We have been using Spark 2.4.4 with Scala 2.12 for a while now, whereas this version has just been introduced to EMR in the 6.0.0 version (March 2020).
At Teads we use EMR with Spark for 3 use cases:
- Permanent clusters: Always-on resources to launch hourly jobs.
- Transient clusters: Created on the fly to launch daily jobs and machine learning trainings, they are terminated as soon as the job is finished.
- Ad hoc clusters: Created by developers or data scientists to explore/analyze data, perform some tests or machine learning experiments. We have 3 jobs to manage those clusters (`cluster-start`, `cluster-resize`, `cluster-stop`)
Some teams at Teads also use EMR to run Flink streaming jobs.
1 — Run our workloads on Spot instances
The cost of our EMR clusters is mainly divided in two with:
- EC2 costs: the actual machines the cluster runs on,
- EMR costs: the Hadoop service installed on the machines including all tools that make managing Hadoop clusters easier.
Our first instinct to reduce costs is to use Spot instances for EC2 which are in general 7 times cheaper than the on-demand equivalent. In return, there is no guarantee of their availability. If AWS runs out of capacity for a specific instance type in a specific Availability Zone, these machines can be reclaimed and allocated to another customer that is paying for on-demand instances.
YARN (Hadoop’s resource manager and job scheduler) and Spark architectures are fault-tolerant by design, losing some nodes should not affect running jobs since failed tasks are automatically rescheduled. And as we store our jobs’ input and output data in S3, our clusters are stateless. Even if we lose an entire cluster we are able to rerun all failed jobs later.
The only scenario that could cause a problem is running out of a specific instance type over a long period of time. The length of this period depends on the context. There are a lot of great resources around the use of Spot instances. We found these detailed feedback from several companies particularly interesting.
Spot strategies for more resilience
In order to make our clusters more resilient to instance shortages, we can:
- Mix on-demand instances with Spots, and especially have the master node running on on-demand.
- Mix different instance types in the same cluster, as a simultaneous shortage is less likely.
- If a Spot cluster fails to launch, have a fallback mechanism able to switch instance types, move them to on-demand, switch the AWS Region or the Availability Zone (an AWS region is composed of many AZs).
- EMR Instance Fleet configuration, the best solution for a more robust spot instance usage within EMR. It’s a native feature supporting most of the previous points (mixing spot and on-demand, multiple AZ, configuring up to 5 instance types with EMR selecting the best one according to Spot availability and price…). Note: EMR will automatically replenish a terminated instance from a different instance type configured in the Fleet if the first one runs out of capacity.
Monitoring Spot capacity
AWS has a tool called the Spot Instance Advisor to compare the reclaim frequency for different instance types. This could be an interesting proxy to decide which instance type to use but unfortunately:
- The provided ranges are too large (< 5%, 5–10%,10–15%,15–20% and >20%).
- Stats are only available by region and not at the AZ level.
- The time range is fixed (last 30 days).
Spot interruption does not always mean we have to take some action. EMR will replace the lost node by a new one as soon as there is enough capacity again. However, it’s recommended to monitor if your cluster went through this. The first option is to configure an event-based notification on CloudWatch to receive a Slack message or an email each time an interruption happens.
Alternatively, we can check the lost node graph, available under the Monitoring tab of the EMR interface:
In the case of lost nodes, we can check the Hardware tab of the same UI and see the Last state change reason for the terminated instances to see if it’s linked to Spot capacity. When that’s the case we should see something like: “Waiting to provision Core Instance Group as EC2 has insufficient capacity for instance_type in the az_name availability zone”.
We created some additional graphs to detect cluster creation issues, including Spot unavailability. It’s important to project on the region availability-zone since this could be very different from one availability-zone to another:
We particularly look at:
- Cluster launch durations: ~8 minutes on average. When we are close to reaching the capacity limit, the launch duration is less stable. We can compare this metric between jobs using different instance types when we are migrating from one instance type to another.
- Success rate: which is the rate of Clusters successfully starting, this metric is analyzed on a specified moving-window (2 hours by default).
- Failed launches: when hovering over this graph, the name of the concerned job will be displayed. We then explore the logs to see the actual reason for this failure.
- Subnet capacity: Even if it’s not really related to Spot capacity, it’s interesting to monitor subnet capacity to troubleshoot cluster creation related issues and anticipate them. We can have some difficulties creating new clusters even if there are enough Spots when our configured subnets are undersized.
Important note: Older instance generations are normally less requested than newer ones, so they usually are more available in Spot.
2 — Leverage the EMR pricing of bigger instances
A couple of important criteria are to be considered when choosing the right instance type for our jobs:
- The optimized resources of the machines (compute, memory or storage).
- The frequency of interruption if we use spot instances, as discussed previously.
- Most importantly: the EMR / EC2 ratio price as it doesn’t increase linearly with EC2 computing capacity, more on that below.
Playing with instance sizes
For each machine generation, AWS provides multiple instance types. These instances are available in several sizes that have a fixed factor on all resources between them: CPU, memory, and storage (it’s less true for network).
Let’s take the r5d generation as an example. An r5d.24xlarge instance has 24 times more CPU, memory, and disk capacity compared to a r5d.xlarge.
If we want to keep the same cluster capacity with bigger instances, we will need 24 times fewer machines using r5d.24xlarge instances. Now if we compare EC2 and EMR prices for these different sizes a r5d.24xlarge EC2 price is 24 times higher, no surprise. But the EMR price is only 3.75 higher. Moving to bigger instances can help save EMR costs.
There is a tradeoff to make with bigger instances:
- We are paying for the EC2 during the startup time of the cluster (~8 minutes), even if no processing is launched yet. So this isn’t very profitable for small/fast jobs.
- We have less granularity in the cluster capacity as the unit is one node. However, knowing that the billing is a per-second rate, oversizing a cluster shouldn’t be a big issue as it will be running for a shorter period.
At Teads, we chose to move to an instance that is 8 times bigger and observed a 30% cost reduction for our EMR clusters.
Analyzing the impact of bigger instances on Spark jobs
From a Spark point of view, we kept the same executors’ properties (memory, cores, count, etc.) when we moved to bigger instances. Only the number of executors per node (x8) and the number of nodes in a cluster (divided by 8) have changed. When we create a new processing job we only reason in terms of executors, regardless of the underlying instance type. Switching to a bigger or a smaller instance is transparent and this makes it easy to mix multiple instance sizes in the same cluster.
Something we did not expect when we moved to bigger instances was a performance improvement for our Spark jobs that run on our permanent and shared clusters. This can be explained by the fact that, when a cluster is not fully used, executors leverage idle resources available on the same machine. Hence with bigger instances, there are more chances to have extra-resources for a given executor.
Let’s take an example with 2 different cluster configurations:
- Cluster A: 48 instances of r5d.xlarge
- Cluster B: 2 instances of r5d.24xlarge
Let’s say at a given instant only one application requesting 2 executors is running. Assuming that the size of one executor fills one r5d.xlarge, a r5d.24xlarge instance could host up to 24 executors.
On cluster A, only 2 instances would be used, and the rest of the cluster will be idle:
On cluster B, we would have one executor in each big instance:
By default, YARN only considers memory to schedule and control containers. Once a container is launched on a node, YARN will only check its memory usage and make sure it won’t exceed the allocated one. However, CPU resources are directly negotiated by the executor with the host’s kernel. So the executors have access to all remaining cores on the machine to perform multi-threaded tasks (e.g. reading from S3). The same applies to network and disk capacity so we can say that using bigger instances optimizes our clusters usage when they are not fully loaded.
To verify this assumption, we defined a simple job that reads a dataset from S3, adds a timestamp column and writes it back to S3. We run this job on two clusters using the same instance type T. An instance T can host up to 7 executors with our default executors’ configuration. Our two clusters are:
- Cluster A: 1 instance of type T, the 7 executors will run on the same node.
- Cluster B: 7 instances of type T, so hopefully YARN will put one executor per node (which I was able to confirm after running the job via the Spark or YARN UI).
The results of this small experiment showed an important gain in processing time on cluster B which was 32% faster!
We were initially expecting a performance increase due to the reduction of inter-node shuffle operations, but we didn’t observe any significant gain related to this. In fact, we saw the same overall execution time reduction in jobs that do a lot of shuffles and those that do not.
3 — Automatically detect idle clusters
At Teads, we have a self-service job called `cluster-start` that developers and data scientists can use to create ad hoc EMR clusters. We sometimes happen to forget about our clusters, and in that case, they might remain inactive for many days. Less often, and due to some technical bugs, we can have the same situation for production jobs.
When one application freezes between the submission of two jobs or when a problem occurs during the termination phase of a cluster used for transient jobs, or for whatever reason, we end up with inactive clusters, which is a waste of resources and money, hence we need to detect them.
To solve this, our `cluster-start` job creates a Cloudwatch alarm which is then cleaned by another job when the cluster is stopped.
Note: for transient jobs, the cluster could be configured to be automatically terminated once the job has finished. However, these alarms are still useful when an application hangs or for long-running (ad hoc) clusters.
Finding the right alarm metric
We want to use the inactivity of our resources as a metric to trigger the alarm. We defined the minimum period of time to consider a cluster as idle depending on the use case:
- Ad hoc clusters: 24 hours
- Production jobs 1 hour
A first candidate is the
IsIdle metric available in Cloudwatch. It tracks the number of allocated containers on the cluster:
But before going further it’s useful to review some of YARN’s concepts:
YARN Containers: A YARN container is a result of a successful resource allocation, meaning that the ResourceManager has granted an application a lease to use a specific set of resources in certain amounts on a specific node. A container could run either an ApplicationMaster or an Executor.
ApplicationMaster (AM): The ApplicationMaster negotiates resources from the YARN ResourceManager to allocate them to the application executors. There is one AM per application.
IsIdle metric is limited because our ad hoc (analysis) clusters are used through notebooks doing a remote Spark-Shell. This setup keeps a long-running application alive. It means that an AM will be allocated on YARN even if no Spark job is running. This won’t work either for production jobs that freeze due to some technical bugs because their AMs would be also there. So, unfortunately, tracking if there are any allocated containers on YARN, like
IsIdle does, is not enough.
Instead, we track the number of running executors (not any containers). We can calculate it using existing metrics in Cloudwatch. As there is always one AM per Spark application:
number of running executors = number of allocated containers — the number of running applications (which is the number of AMs)
We consider a cluster as active if this metric > 0.
Although we think that the number of running executors better reflects the inactivity of a cluster, this is still not perfect. There are two cases where we can’t detect inactive resources:
- If dynamic resource allocation is not enabled in the Spark application, the executors are not released at the end of the job execution unless all the application is terminated.
- If some data is cached in the executors, they won’t be released when the job is finished even if dynamic resource allocation is enabled (note: this could be handled by setting:
The ideal metric would be the number of running jobs. But this is a little bit more complicated to implement since it requires to:
- Request the YARN API to fetch running applications,
- Then request the API of each Spark Driver to ask them about the last job that ran.
We might try it one day but for now, our alarms rely on running executors with good enough results.
Obsolete alarm management
We configure the alarm to consider missing points as OK — the alarm still exists but no data is received — since this would mean that the cluster has crashed or was terminated manually. And we periodically clean up obsolete alarms, defined as the ones attached to clusters that don’t exist anymore.
When an alarm is triggered, we choose to not terminate the cluster automatically. We simply send a notification with the cluster name and id in a Slack channel so its owner can see it and decide to stop it or resize it.
We have a job that runs at around 8 pm every day to terminate all analysis clusters unless they have a `keepCluster` tag, which can be added when we create a cluster. The described alarms remain useful for these tagged clusters and those of prod jobs.
To sum up, the main ideas to keep in mind are:
- Spot instances are on average 7 times cheaper than their on-demand counterparts. Their availability is not guaranteed but it’s still relevant to use them within an EMR cluster where the software is distributed and fault-tolerant by design, especially if your clusters are stateless (not used for the permanent storage of data).
- Consider bigger instances and try to evaluate new instance types and sizes regularly. The cost of the EMR service normalized by the machine capacity usually decreases with bigger instances. Besides cost, job performance and spot availability are also to be considered.
- Automatic detection of idle clusters is a must-have to protect you from technical bugs on cluster termination or oversights. The challenge is to choose the best metric to detect them. For our case, the one exposed by Cloudwatch was not enough, we rely instead on the number of running executors.
We were quite happy with our results after implementing these tips. However, in a big data context, usages, patterns, and volume evolve rapidly and require continuous efforts to:
- monitor/browse costs
- invest in understanding the graphs
- identify expensive services/usages and try to reduce their costs.
And that’s just the tip of the iceberg! For our processing jobs, the costs of data transfer from S3 to the clusters represent twice as much as the actual processing costs (EMR + EC2). This will be the subject of a future article, so stay tuned!
Thanks to Alban Perillat-Merceroz, Damien Pacaud, Yann Moisan, Robert Dupuy, Cyrille Dubarry and especially Benjamin Davy who have helped me writing this post.