Running Spark Pipelines on EMR Using Spots Instances

A compilation of good practices and lessons learned in a production environment

Louis Fruleux
Teads Engineering
8 min readFeb 22, 2022

--

This post was co-written with Nicolas Vizern 👋

At Teads, we extensively use Spark with AWS EMR for our data pipelines. With new features and new data pipelines developed on a regular basis, it is important to control and optimize the setup of our clusters in order to avoid dramatic cost increases as the business grows.

Spot instances are an effective solution to minimize cost increases. These are instances not currently in use by other customers so they are offered at a lower price.

We have been using spot instances for several years now, and in this article, we share some of our learnings:

  • We will first quickly go through the basics of how spot instances work and how to choose instance types.
  • Then, we will review specific features from YARN that can be used to make critical jobs more resilient.
  • To finish, we will finally see some Spark 3.1.X features that can help mitigate the impact of lost executors.

If you are aware of the Spot mechanisms and EMR pricing, you can directly jump to the YARN’s node feature section.

Spot instances and EMR basics

Spot Instances Pricing and Constraints

According to AWS’ documentation, spot instances have a discount of up to 90% compared to on-demand instances. In practice, we observe a 60% discount, which is still better than the 37% discount you can get for reserved instances (1-year contracts).

However, this discounted price comes with a few constraints:

  • First of all, spot instances can be reclaimed anytime by AWS with a 2-minute notice. AWS usually reclaims these instances when they need to provide on-demand resources to other customers.
  • Also, we lose some visibility regarding performance tuning on our jobs. The instance types being different between each run, it becomes harder to only compare the duration of the jobs.
  • There is also an extra development and maintenance cost that is hard to quantify when you use spot instances, such as monitoring.

EMR Pricing

When using EMR, the pricing has two components:

  • the EMR agent
  • the underlying EC2 infrastructure (mentioned above)

EMR agent cost is not linear with the number of executors per machine. For example, if we want eight executors and we choose eight r3.xlarge machines, we will pay 0.09*8 = 0.72 cents per hour, against 0.27 * 1 = 0.27 cents per hour for a single r3.8xlarge machine. Which is a 60% saving.

This is why at Teads, we tend to use big instances for our workflows.
However, as you can guess, the major drawback of using larger instances is that reclaims will have a higher impact. For r3.8xlarge reclaims, you lose 8 more times executor than for a r3.xlarge reclaim.

💡 If you want to learn more about best practices we have put in place to reduce EMR costs, you can check out this article:

Choosing Instances

It is unadvised to put all your eggs in one basket. It is also true for Spot instances. You don’t want to bet all your workflows on a single instance type because it can run out of capacity or get reclaimed. To mitigate this issue, AWS allows diversifying the instance types. It lets us fall back to other instance types in the case of reclaims.

Luckily for us, Spark users, diversification is not that hard. When we design a Spark job, because of Spark’s execution paradigm, we only need to specify the number of executors and the resources per executor including memory and vCPU. Then we have to choose the instances for our cluster. In order to maximize resource usage, we are only looking for instances with the same ratio of memory per vCPU.

A common pitfall would be to use instance types from the same family because vCPU, memory, and cost generally grow linearly with the machine size. For instance, r3.8xlarge is eight times bigger than r3.xlarge in terms of memory and vCPU, hence can host 8 times more executors.

However, the diversification strategy should also include several instance families. If a hardware issue occurs, all the instances from the same family might be impacted. For instance, if a hardware issue occurs on r5 instances, all ther5.8x , r5.4x, etc. may be impacted.

Most of the workflows at Teads tend to use older instance generations, this is mainly due to their reclaim rates and their cost. They tend to be a bit more expensive but with a lower reclaim rate.

To sum up, choosing instances requires taking into account different factors:

  • The instance hardware,
  • the reclaim rate,
  • and the cost.

If you have no idea where to start when choosing your instances, you start by doing a selection based on hardware specifications that would suit your usage. There is a dedicated CLI tool called the Amazon EC2 Instance Selector that is provided by AWS.

Enable YARN’s Node Labels Feature

Spark Applications run on clusters that contain a few typical components. In order to understand the YARN feature node labels, a few definitions are necessary. From the Spark documentation we can read:

  • The cluster Manager service is in charge of acquiring resources on the cluster. For our use case, we use YARN.
  • The Driver node is where the main() function of the Spark job is running as well as where the Spark Context is created.
  • The Worker nodes are any nodes that can run the application code, they are in charge of executing the Spark Tasks.

Going back to the main drawbacks of spot instances is the reclaims that can happen anytime on those instances. Although a Spark cluster can be resilient to task failures, it cannot withstand the loss of its Driver node nor the cluster manager running node.

When launching an EMR cluster, it is asked to provision 3 main categories of nodes:

  • Master Instance
  • Core Instances
  • Task Instances

Understanding where each component is running is key to understanding the failures due to node reclaims.

The EMR master instance has only one node. This is where the cluster manager is installed. It will be where the Yarn Application Manager and scheduler are running for our use case. More details on its role can be found in the YARN documentation.

Core and Task instances are the nodes where the Spark Application will run. By default in EMR 6.X, both the Spark Driver and the Executor nodes could run in either a Core or a Task node.

This leads to an issue if the Spark Driver is running on a node that is reclaimed: this is another point of failure for the job execution. Also, if an executor is running on a node that is reclaimed, Spark is fault-tolerant enough to reprocess only the subset of work lost with the failure without stopping the job.

Taking this into consideration, EMR proposes to use the YARN node labeling feature to prevent job failures from spot reclaims. In a nutshell, the aim is to force the Spark Driver to run on CORE nodes only and to set those instances as “On Demand”.

As such, we will have the following configuration:

  • Master Node: On-Demand or Reserved with a small instance. In our case, we use some m3.xlarge.
  • Core Nodes: On-Demand or Reserved, mostly to run Spark Drivers. The rest of the resources are then used to run executors’ tasks.
  • Task Nodes: Spot instances that will run the Spark executors tasks only.

This setting has the advantage of limiting the number of On Demand or Reserved instances to only the critical components, ie. YARN Master and Spark Driver, while letting the components that are fault-tolerant on Spot instances.

We tested this setting over several months now and it was satisfying enough for us. But it comes with a non-negligible cost and should be used when fully informed of the benefits and costs.

Use Spark 3.1.X

Starting from Spark 3.1.X (available from EMR 6.3), there is an effort from the Spark community to make Spark resilient to the loss of executors. A design doc (here and here), has been initiated by Holden Karau. The executor decommissioning seems to be a more and more important feature in modern clouds.

Several use cases can be pointed out:

  • AWS spot reclaims
  • GCP VM loss
  • YARN overcommitment

In all these cases, executors might be killed, and it is a waste of time and resources (money) to recompute tasks due to executor loss.

We won’t go into details here on how this feature works, but if you want to there is this great article that illustrates the full workflow. So Spark 3.1.X introduced several features to mitigate the loss of executors which can be tuned once the option spark.decommission.enabledhas been set.

Block migrations

This feature can be activated via the options such as spark.storage.decommission.shuffleBlocks.enabled (the kind of blocks that are handled can be found here). The idea behind this feature is to migrate computed blocks of data to reliable executors to avoid the recomputation of the tasks.

In case other executors do not have enough memory to migrate blocks, a fallback.storage can be used.

Tasks speculative scheduling

In the case of spot reclaims, the time to live of the instance is known (2 minutes at the time of writing). Hence Spark can use this information to speculatively reschedule tasks.

This can be configured via spark.executor.decommission.killInterval.

In Spark 3.2.X, node decommissioning will be improved, we suggest following this ticket if you work with spot clusters.

At Teads, we luckily saw the feature in execution in production. And it reduced the delay induced by the spot reclaims.

Conclusion

Our EMR workflows have been migrated to Spot instances for several years now. And the efforts we put into monitoring and development were definitely worth it.

Since we moved to Spot instances, we saved around 60% of the costs related to our EMR workloads.

You might consider moving to Spot and saving on your EMR cluster’s billing if:

  • you have a bit of time to add monitoring (example of monitoring pipeline),
  • your workloads are not delay-sensitive

If you plan to move to Spot clusters, you need to consider several important things:

  1. First of all, the diversification of instances. As previously said, you don’t want your whole cluster to go down just because there is a capacity issue on an instance type.
  2. You might want to consider using YARN’s node labeling feature to reduce delays and failures. Be aware you should monitor the extra cost vs the impact of delay on your business. Because using on-demand instances is more expensive.
  3. Lastly, you might want to leverage the options introduced in Spark 3.1 to mitigate the cost of loss executors.

What’s next?

We still have several ideas to keep reducing our EMR workloads billing while running on Spots.

We could benchmark instance types and compare the gain of speed vs the price increase, we could also try to set a spot price to a fixed amount to avoid billing increase when the spot market evolves.

Finally, we cannot wait to see where the EMR serverless feature will take us. By chance, it could also use spots underneath and have fast cluster bootstraps. Which will remove our need for permanent clusters.

--

--