Spark on Kubernetes in 2022
How we run hundreds of jobs, and how we migrated from EMR
In 2021, we migrated all Spark jobs at Joom from Spark 2 on Yarn/EMR to Spark 3 on Kubernetes/EKS. We met our goals, but it was a lonely journey — while “compute 𝛑 with Spark on Kubernetes” was an easy hike, the next day we found ourselves in the fresh snow with nobody around and no direction signs. In this post, I’ll outline the major things we’ve learned.
Goals and results
AWS EMR was a wise choice at the start. For a team with zero Hadoop experience, a managed solution was rather helpful — we had HDFS, Yarn, Hive, Spark, HBase, and Zeppelin all configured for us. However, as we gained experience and standardized on Spark as the primary technology, the benefits became less important, while some downsides emerged. Our migration had three goals.
Reducing costs
The first goal was cost reduction. With AWS EMR you pay for the EC2 instances and for the EMR itself. You can use spot instances to reduce EC2 costs, but then the EMR surcharge can add 50% to the total bill. Further, an EMR cluster should always have persistent master and core nodes that increase cost without helping much with performance. With open-source Spark, we’d only pay for the necessary EC2 instances.
In practice, the largest improvement happened for interactive workloads. The chart below shows daily costs (units removed), which went down by half.
The overall compute costs look less dramatic, but also show material reductions, both relative and absolute.
Better integration
The second goal was Spark integration with data science tools. We used Zeppelin and wished to expand our Jupyter usage. However, if you use EMR, where do you run these tools?
- If you run them outside EMR, you can’t directly create Spark sessions, and this reduces usability. (The technical reasons will be explained later)
- If you run them inside EMR, the setup becomes very convoluted, and autoscaling is almost impossible.
As of today, we have both Zeppelin and Jupyter running in Kubernetes, autoscaling as necessary, and directly creating Spark sessions. For example, in Jupyter one can run Spark query, turn its result into Pandas dataframe, and proceed to train ML models, all in one paragraph, and ML engineers love that.
Up-to-date Spark
The third goal was access to up-to-date Spark. For example, we wanted to use Delta Lake, and it only became fully useable in Spark 3. We also were eager to try a feature called “nested schema pruning” that brings dramatic speedup if reading a few columns from Parquet files with nested schema. Waiting for the EMR release with Spark 3 was not perfect.
Minor releases and fixes are also important. The same nested schema pruning feature is complex and was gradually improving over several releases. Waiting first for the official Spark release, and then for the matching EMR release, could add up to months.
In our current solution, we build Spark images from the source. We use a release branch and so important fixes tend to be immediately available.
Overall Architecture
Let’s start with a rough picture of key things we need for Spark on Kubernetes to work. The most important parts are on the top, and we’ll discuss them in detail below.
Spark Operator
If you read my previous post about Kubernetes, you know that Kubernetes is all about active controllers that take YAML descriptions of things and bring them into existence. In the case of Spark, we use Spark Operator, which takes descriptions of Spark applications and creates driver pods.
Itself, this is a minor convenience over running spark-submit by hand, but a second aspect is that the Spark operator can intercept the creation of Spark executor pods and modify them. For example, you can apply scheduling properties such as affinity and tolerations. These properties cannot be specified through the Spark configuration directly, and the pod template is rather inconvenient, so Spark Operator proved helpful.
Our setup of Spark Operator is not notable in any way.
Airflow
The Spark Operator has an Airflow integration, but it’s absolutely minimal — it allows you to specify a YAML file and replace template variables in it. In fact, it does not have anything specific to Spark. Therefore, we wrote our own high-level Airflow integration that:
- Exposes most of the Spark options as named parameters
- Contains shared per-team options that can be used by individual Airflow nodes to conveniently apply all these options. For example, we use it for set affinities, tolerations, and permissions.
- Collects and reports the final status of all Spark executors, for example, to clearly report out-of-memory failures.
Here’s a usage example
from joom.utils import sparkK8sJob
...
load_dim_region = sparkK8sJob(task_id='platform.dim-region-job',
file='@jar.location@',
job_type="platform_default",
class_name="[...].DimRegionJob",
args=['production', '{{ ts }}'],
dag=dag)
The icing on the cake is direct links from Airflow UI to relevant logs and monitoring dashboards. The last three buttons on the screenshot did improve usability a lot.
Logging
When running Spark on EMR, monitoring and logging is simple
- While the app is running, YARN provides the link to Spark UI. After the application exits, the link changes to the Spark history server
- While the app is running, executor logs are available from the physical nodes, through YARN UI. After the application exits, the logs are collected by YARN to HDFS and are also easily available.
Without YARN, most of this must be redone.
- Deploying Spark History Server in Kubernetes is easy. Just make sure it is not resource-starved and you are good.
- In Kubernetes, log storage is up for you. For service logging, we use Loki with Grafana, but for reviewing Spark logs, we found that raw log files provide more information density than any UI. Therefore, we use Filebeat to send pod logs to Kafka, and a small Spark job to store them in S3.
- Finally, since Spark UI and logs are located in different places for running and finished apps, a small custom service is used to provide stable URLs and redirect to the current locations.
Adhoc jobs
During development, it is convenient to run a job without yet creating an Airflow DAG. To support this use case, we have extended the Spark manager service, mentioned above, to add a run API. It accepts a jar and job parameter, creates Spark Application K8S object, and streams its output back. We also have added a custom Gradle task to submit a jar through this API. The resulting user experience is very convenient.
Zeppelin on Kubernetes
Zeppelin is an open-source notebooks system. Its primary strengths are:
- Support for multiple interpreters inside a single notebook: you can have Scala Spark, PySpark, Clickhouse, and Athena together
- Automatic visualization of data
- Dynamic input forms that you can change to rerun queries
Our Zeppelin setup on AWS EMR was decidedly ugly. We had a large node inside an EMR cluster, its config was hand edited to disallow any Spark jobs, and it had the Zeppelin server configured through bash scripts, Ansible, and duct tape.
The custom setup for Zeppelin was needed to use the most recent version and to apply local fixes. It had to be run inside the EMR cluster for a licensing reason — specifically, Zeppelin needs to create Spark sessions, but AWS EMR uses a modified Spark and does not allow to copy it outside of EMR clusters. Finally, we had to use bash scripts because Zeppelin does not offer an easy install approach.
As soon as we had Spark on Kubernetes running, we rushed to integrate it with Zeppelin, which supports Kubernetes since version 0.9. Sadly, that version also changes how the note hierarchy is represented in storage, and that change had a lot of UI bugs. Imagine 10000 notebooks from 100 users that were arranged in a nice hierarchy and now are shown flat list in half of the cases. We were forced to set Spark aside, migrate on Zeppelin 0.9 on EMR, and fix the UI bugs — in an Angular 1 codebase. In a sense, it was the most unexpected and disappointing part of the entire migration.
The actual Kubernetes migration went smoothly. We used placeholder pods to make sure new user sessions are quick to start, fine-tuned Spark dynamic allocation, and in the end gave users more compute resources, faster execution, and reduced costs by half.
Jupyter
We already had JupyterHub up and running in Kubernetes. However, for the same license reason as Zeppelin, it could not directly create Spark sessions. Instead, one could use SparkMagic to run requests via Livy and then, in another cell, copy data from the EMR cluster to Python. In practice, this friction was enough to prevent wide adoption.
After the migration, we can directly run the PySpark code and collect the results into Pandas data frame. However, we have a lot of custom code on top of Spark, for example, to atomically save tables, or read data from Mongo. These were not available from Python, and we had to find a way to import them — which is described in a separate post by colleague Sergey Ivanychev.
Tuning Kubernetes
If you read my previous blog post, you know that for data science services such as Jupyter, our experience with Kubernetes was fairly positive. The initial experience in the Spark context was also positive. For example, the cost reduction comes in part from the efficiency of the Kubernetes Cluster Autoscaler. However, after moving all jobs we found a couple of issues related to spot instances.
Fragmentation
The easiest problem is fragmentation. If you use different sizes of instances, you can end up in a situation where an 8xlarge instance with 32 vCPUs is kept alive because of a single late executor using 4 vCPUs. This problem is exaggerated by the default behavior of the Kubernetes scheduler, which picks nodes in a random fashion. To alleviate it, we used pod affinity to request that pods from a given Spark job are preferentially scheduled together. For example, it can look like this:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: spark/app-name
operator: In
values:
- {application_name}
topologyKey: kubernetes.io/hostname
Interruptions and launch failures
Recall that spot instances are AWS spare capacity. They are available at a large discount, but the exact price tends to fluctuate, launching an instance may fail and an instance may be terminated with little notice. The details, understandably, are secret — if everybody knew exactly what is spot capacity and how interruptions are made, then everybody would just use spot nodes for everything. The key is that beyond a certain scale, interruptions and launch failures can happen often. Below is a screenshot of our spot monitoring dashboard on a good day.
If a spot node with Spark executor is interrupted, any data on it is lost, and Spark has to rerun the corresponding tasks. EC2 provides 2-minute notice prior to interruption, and in theory, Spark 3 has a feature called executor decommissioning that can move all data to other executors or to S3. In practice, it does not work:
- Enabling this feature results in frequent data corruption. It is a known issue that so far resists all attempts at fixing.
- In the optimistic case, an executor has 120 seconds to handle decommission and can move data at 150MB/s. This adds up to 18GB of data, while we often use above 200GB of disk, so data loss will still happen.
In the end, accepting that spot interruption happens and can result in task retries is the only option. It is sometimes annoying, but we can live with it.
Launch failures proved to be more problematic. Suppose that several teams run several large jobs on their own node groups, but use the same instance type in a single availability zone. If you run out of spot instances, each job might get too few resources to successfully finish, and some will fail. One would hope though, that as jobs finish, nodegroups get downscaled and spot nodes are returned to EC2, ready to be allocated to other nodegroups.
Sadly, the default Kubernetes autoscaler does not work this way. As soon as nodegroup A requests a node and gets a launch failure, autoscaler disables downscale in all other node groups. In other words, even if nodegroup B still has nothing to do, its nodes won’t be released and no resources will return to EC2, while A is still waiting.
To address this problem, we were forced to create several instances of Cluster Autoscaler, each managing its own nodegroup. This also means that having two dozen per-team nodegroups became unmanageable, and we switched to having just 3 node groups for Spark, each per availability zone. This is clearly not optimal, and we plan to look at more advanced autoscalers, such as Karpenter.
Conclusion
In this post, I described our migration from Spark on Yarn/EMR to Spark on Kubernetes. It turned out to be a large project with some setbacks. Even with the basic setup in place, moving a job Spark 2 to Spark 3 takes time, and can reveal issues. In our case, with hundreds of jobs, there were quite a few one-off fixes to make. However, in the end, we reduced costs, improved performance, made Jupyter/PySpark a pleasure to use, and are enjoying the most recent version of Spark.
Most of the foundational work was done by my colleague Sergey Kotlov. I am also thankful to many other colleagues who directly contributed, migrated their jobs, and patiently waited while we figured out things.
Last, but not least: if you also use or plan to use Spark on Kubernetes, and have a different experience or would like to discuss the challenges, I’d be happy to talk. This is a free offer, I am not selling anything, just will be pleased to chat with fellow data engineers — feel free to pick any convenient time on my calendar.