Make Spark resilient against spot interruptions on Kubernetes

Niels Claeys
datamindedbe
Published in
7 min readJul 25, 2022

We gained a lot of experience running Spark on Kubernetes while developing and running Conveyor at our customers over the past 2 years. Conveyor assists in the delivery of data products from ideation to production. It allows customers focus on solving their business problems instead of worrying about the infrastructure, support, cross cutting concerns (security, scaling,…) etc.

In this blogpost, I want to focus on 3 ways how we improved the reliability of Spark on Kubernetes. For each approach I will elaborate on how to use it with Conveyor as well as how you can implement something similar yourself. Improving the reliability of Spark was also discussed in detail in one of our previous webinars called “Spark on Kubernetes in the real world”, which you can watch here.

Why would you use spot instances

A spot instance is an instance that uses spare capacity of a cloud provider and is typically made available at a cheaper price compared to on-demand instances.

The price reduction can vary depending on the cloud provider, but typically it lies between 60%-90% compared to on-demand nodes of the same size. Your cloud provider can reclaim the node at any point in time (called a spot interruption), which is why you should be resilient to failures when using spot instances. A spot interruption happens when your cloud provider has no more spare capacity to run the on-demand workloads of other customers.

If you can make your Spark application resilient against these interruptions then you can run the same workload at only 10%–40% of the cost of running on on-demand instances.

How to make Spark on Kubernetes resilient

Over the past 2 years, Spark has significantly improved how they run jobs on Kubernetes. I believe that Kubernetes is the way forward for Spark instead of the legacy Yarn resource manager. Conveyor runs Spark jobs on Kubernetes using spot nodes by default, such that our customers can benefit from the lower price. To make this work well, we implemented the following improvements:

Do not kill the spark driver

When you submit your spark application on Kubernetes, it will launch a driver process, which is responsible for running your application. The driver will split your application into multiple jobs and schedule them on 1 or more executors in parallel. The driver is also responsible for all the book keeping: which tasks have completed, where are the intermediary results stored,…

High level components when running Spark on Kubernetes

When the driver process is killed, the whole spark application fails. For critical application this is unacceptable, which is why we advice our customers to run the driver on an on-demand node and the executors on spot nodes.

You can enable this behavior in Conveyor by setting the life-cycle property of your spark job to driver-on-demand-executors-spot.

To implement something similar without Conveyor, you could use the following approach:

  • First create separate autoscaling groups/node pools for on-demand vs spot nodes. Make sure to identify them based on a specific taint, like node.kubernetes.io/lifecycle with their corresponding value.
  • Secondly you could create and register a custom admission webhook in Kubernetes, which will be called whenever a pod is created in your cluster. In the admission webhook you can add the necessary tolerations for the taints that you have created depending on the settings for the driver and executors.
  • Alternatively, you could use the pod template file feature from Spark, which also allows you to specify tolerations for both driver and executor pods. However, it is not really flexible when you want to use different configurations since you always need to define the full pod specification.

Having only the driver running on an on-demand node will still significantly reduce the total cost of your workload because:

  • The driver is typically a smaller instance compared to the executors. This is because the driver does not process any data and acts more like a coordinator.
  • There is only 1 driver for 1–100 executors. Depending on the amount of executors this can make a huge difference.

Reduce the impact of spot interruptions

The cloud providers typically warn you about 2 minutes before they will reclaim your spot node. Node decommissioning was introduced in Spark 3.1.1, which uses the 2 minute window to copy shuffle files, rdd blocks and intermediary results to another executor. This way Spark can reuse the existing information instead of recalculating it from scratch. When running the TPC-DS benchmark, we encounter up to 10% performance improvements when enabling decommissioning in case of spot terminations.

In Conveyor this is automatically enabled for you when you use a recent Spark image that supports node decommissioning.

If you want to do this without Conveyor, you could do the following:

  • Listen for the spot interruption events and cordon and drain the respective node. For AWS this functionality is implemented by the aws-node-termination-handler. This ensures that all applications running on that node get 2 minutes to gracefully shutdown.
  • Configure your Spark application to enable decommissioning in case an executor gets killed:

Some gotcha’s when using the decommissioning feature:

  • There is no need to enable it when you request only request 1 executor. In this case the feature cannot do anything as there are no executors to copy data towards.
  • Be careful about enabling it when you have a small number of executors (2–3) and a lot of shuffle files. The remaining nodes will receive more data and can run out of memory when running additional tasks. This happens rarely but is possible when your node was already running close to the memory limit. I encountered this issue when testing the feature out on query 64 of the TPC-DS benchmark with 1Tb of data and 2 executors with (4 cores, 16Gb RAM).

Reduce the likeliness of spot interruptions

We wanted to do more since some of our customers encountered more and more spot interruptions which could not only be explained by running more jobs.

Not all instances are equal when it comes to spot interruptions

Some instances are more scarce than others, typically the larger and more specialized ones, and thus are more likely to get spot interruptions compared to a (smaller) general purpose VM (i.e. 4 cores and 16 GB RAM). One way to mitigate this is to add multiple instance types to your autoscaling group and let AWS pick the instance type for which it has the most capacity (i.e: allow m6i.xlarge, m5.xlarge,… as they provide similar resources).

You can do this by configuring your autoscaling groups to use spot instances with the “capacity optimized” instead of the default “lowest price” allocation strategy. Apart from changing the allocation strategy, you should add as many override instance types as you want to your autoscaling group.

Determine the best Availability Zone to use for your workload

A last improvement that we made in Conveyor, was to incorporate the AWS spot placement score API. This API shows you how likely your workload will get a spot interruption based on the instance type, region, Availability Zone (AZ), and the requested number of instances. The response returns a score between 1 and 10, which we use to decide in which AZ to run your workload.

Conveyor comes out of the box with support for selecting the best AZ for your workload.

If you want to do this without Conveyor on AWS, you can do the following:

  • Create auto scaling groups for every AZ in your region, instance type in your region and add the necessary taints.
  • Create a component that queries the Spot placement API on a regular basis and caches the results such that you know the best AZ for each instance type.
  • Customize your spark operator to add the necessary tolerations to your pods such that they run in your preferred availability zone.

Gotcha’s using the spot placement API:

  • You can only request a limited amount of configurations within 24 hours. You cannot request too many different request configurations because otherwise you will be blocked by AWS.
  • You get throttled if you do too many requests to the API, so make sure that you cache the results for a while instead of calling the API with every new job.

Both improvements had a significant impact since the average spot interruptions per day were reduced by 60% as well as the standard deviation was reduced by 66% compared to the situation before making both changes.

Conclusion

In this blogpost I elaborated on 3 different approaches to make your Spark application more resilient, namely: run the driver node on-demand, use the new decommissioning feature from Spark 3.1.1 and choose the availability zone which is less likely to have spot interruptions. We implemented all approaches in Conveyor, which significantly improved the robustness of Spark applications. We are constantly looking for new ways to do even better, stay tuned for future updates on this topic.

Want to know more about Conveyor

Do you want to benefit from running Spark workloads in a robust manner while getting the cost benefits of using spot instances, take a look at the Conveyor website or our 📕 documentation.

If you want to try it out, use the following link. From there you can get started for free in your own AWS account with a few easy steps.

--

--

Niels Claeys
datamindedbe

Data (platform) engineer @Data Minded with an fondness for distributed systems. Loves: AWS, K8s, Spark, Duckdb