Revamping the Spark ecosystem at Quora

A journey of cost savings, tech debt reduction, and improved reliability for thousands of apps

Akshesh Doshi
Analytics Vidhya
10 min readMay 22, 2024

--

Co-author: Nafiz Chowdhury

At Quora, we heavily rely on data to enhance user experiences and drive business growth. At the heart of our data-driven culture sits Apache Spark, which enables us to analyze vast amounts of information, helping us make informed decisions. It powers various use cases such as our feed and digest personalization, near real-time streaming requirements, ML model training, etc.

Over the years, with our growing Spark usage and about ten thousand apps running daily, we started seeing a rapid uptick in our cost and operational burden. Alongside, we also observed a continuous increase in our technical debt. Recognizing the need for a comprehensive upgrade, we recently embarked on a journey to revamp our Spark ecosystem — a project that resulted in significant improvements across various metrics and redefined our data processing capabilities. In this blog post, we would like to take you through the why, what, and how of our Spark system overhaul, sharing key insights and learnings that could benefit the wider tech community.

Our Spark ecosystem comprises various parts. The Spark apps themselves run on YARN. We use AWS S3 as our primary data storage (for offline data) and rely on Hive Metastore for table metadata. The majority of the apps use PySpark but there are also some streaming use-cases written in Java too.

Spark & its surrounding systems

Why we needed a change?

We faced several challenges that necessitated an upgrade.

Technical debt

Over time, we had accumulated enormous tech debt. We were using 3–5 years older versions of open-source software. Consequently, we often found ourselves patching their code, to circumvent bugs that had already been addressed in newer versions. This band-aid approach required us to unnecessarily maintain an internal version of the open-source repos. Using older versions also meant we were not keeping up with the latest security updates of the systems.

Upgrading would allow us to reduce this tech debt and improve the overall maintainability of our system.

Cost

A few years ago, our YARN cluster (running Spark apps) was one of the costliest components of Quora’s infrastructure. To make things worse the expenses were surging further very quickly. This trend was unsustainable and demanded our immediate focus.

Luckily, we could identify a plethora of opportunities to improve application performance through both hardware and software enhancements in this system. Since we use AWS EC2 cloud instances, improved performance translated to expenditure reduction.

Cost of running our Spark apps — historical view

New features

Simply put, advancements in Spark & Hadoop’s capabilities were too good to overlook. New features such as zstd compression, a unified memory model, and the new S3 committers were pivotal in our decision to upgrade.

We wanted to leverage these new features to improve the reliability & efficiency of our Spark jobs.

Spark 3 has also introduced several performance enhancements to its engine. The new addition of Adaptive Query Execution (AQE) optimizes query execution plans on the fly based on runtime statistics. Due to the varying nature of data in our systems, having runtime adaptivity can have a major impact on execution performance. AQE can dynamically adjust the number of shuffle partitions, switch join types, and even optimise for skewed data joins leading to faster and more efficient queries.

Other issues

We had also accumulated a list of things that did not make sense to us anymore. Most notably, we were maintaining a legacy HDFS cluster as Spark’s default file system, although we had already migrated our data lake to S3. Having an operationally heavy HDFS cluster for this use case was not justifiable for our lean team, hence we wanted to completely get rid of it to save precious dev time and hardware costs.

The Transformations

Software and hardware changes

Aside from the upgrade of our Spark libraries from v2.2 to v3.3, we also updated our underlying YARN cluster (from Hadoop2.8 to 3.3). Due to incompatibility issues, we had to also bump up our Hive Metastore version from 1.2.1 to 2.3.9 — this being a stateful application, turned out to be an upgrade that required careful planning.

Other than these, we took this opportunity to change a few other cogs in this wheel which claimed better efficiency. We went from Java8 to Java17 for YARN NodeManagers and our applications, which also had a plethora of optimizations related to speed, garbage collection, etc. Another major decision we took was to move away from Intel-based processors to ARM-based processors for our servers — we leveraged AWS Graviton EC2 instances which had provided a much better price-to-performance ratio in other systems at Quora.

Notably, the Spark, Java, and hardware upgrade, have various benchmarks showing 20–40% improvements each, by other adopters in the industry. The gains largely vary based on the workload type, the bottlenecks of the app, etc.

System design changes

The most notable change was getting rid of an HDFS cluster which was used as our Hadoop’s default filesystem and stored some temp data for Spark apps to work. This was a legacy cluster that used to hold our data lake. Since we had already migrated our data lake to S3, we consolidated the remaining use-cases to S3 and got rid of HDFS.

Secondly, we took this opportunity to substantially improve our visibility over the system and each Spark app. Above all, we added several executor-level metrics which allow us to know exactly what’s going on with any app, at any time. Some of these metrics were further leveraged to identify apps running with sub-optimal configurations & report resource (cost) wastage, helping us optimize 30+ apps so far with minimal effort. We are currently working on automating this optimization tuning. Below is an image showing one of our sub-optimal apps from production.

Some of our dashboards around Spark apps’ cost & efficiency

Lastly, we also added a custom log server to view executor logs from the History server. This eased debugging for completed apps and boosted dev productivity.

Strategy of the migration

This upgrade was a massive change in a framework that impacted our production tremendously. With so many components being altered at the same time, we felt the need to build confidence in the system’s accuracy and reliability, along with ensuring that it had no regressions.

To achieve this, we set up a separate new cluster (with all our new components) and ran it along with the old one. On the client side, we supported ways to run Spark jobs on both clusters, along with maintaining a pyspark2 library that was just a clone of pyspark==2 with pyspark renamed inside its code. This was especially useful as we could now have client boxes run both Spark2 and Spark3 jobs from the same (monolith) code repo - we’ll explain how this was useful later in this article.

We then migrated our apps in 2 major phases:

1. Manual Migrations

In this phase, we migrated apps with careful shadowing and data validation. We built a duplicate of each existing app that we wanted to migrate. This duplicate job would use our new Spark3-based system without fiddling with any of our production data (for example, if the job writes data to S3, we would write it to a non-production “shadow” S3 location). We would then run a validation job to check if the outputs of the production and shadow jobs matched. In cases of output mismatches, we would investigate root causes and configure our new system to behave like the old one. This approach helped us identify and eliminate a lot of unknown incompatibilities at a very early stage — we have described a few important ones later in this article.

This approach bolstered our trust in the new system but came with its fair share of complexities. Firstly, this approach required carefully understanding the logic of the app and manually duplicating it in a way that doesn’t impact production. This was very risky, as any manual errors could be catastrophic and hence warranted stricter testing & code reviews. Another challenge with the approach was that it required additional efforts from our product and data science teams to build validation pipelines and evaluate that the shadow data was accurate.

Overall, this was an intensive burden on various teams. With thousands of jobs running in production, using this approach would require a humongous dev engineering commitment. Hence, we decided to take this approach only with a few selective apps. We chose our costliest and most complex apps for this phase, as they had the highest ROI. Since these apps were pretty costly to run, we didn’t want to hamper their reliability even for a single run, hence a manual approach tailored specifically for them made sense.

2. Bulk Migrations

Once we established more confidence in the new system, we wanted to increase the pace of our migration, hence we decided to migrate larger batches (100s) of apps at a time.

To execute this reliably, we implemented a migration tool that we called SparkSwitch — a wrapper on top of our Spark2 and Spark3 clients. This tool gave us 2 important capabilities:

  1. Choose the Spark version for an app at runtime, by configuring it in an external storage (Zookeeper in our case)
  2. Detect failures in an app when it ran with Spark3 and automatically override Zookeeper to run the app on Spark2 in future retries

With these two features, we were able to roll out multiple apps reliably, knowing that they would fallback to the old system if they failed on Spark3.

The tool would also Slack us about any such failures. Since the apps automatically fell back to Spark2, the team could check any app failures the next day, without impacting the team’s on-call burden. The implementation details of this tool could be an article of its own.

Slack bot informing us about an app’s failure & automatic fallback

To further reduce the blast radius of these changes we divided all our apps into multiple batches and migrated one batch at a time. This in-house client switching capability enabled us to onboard our apps to Spark3 smoothly, with minimal breaking changes. As we gradually onboarded more and more apps to Spark3, we also monitored both systems’ resource usages to judiciously upscale our new cluster and downscale the old one.

Challenges

While our automated migration tool proved to be invaluable, transitioning to Spark 3 wasn’t without its hurdles. We encountered several breaking changes and regressions that demanded careful attention.

Changes in JDBC/MySQL interactions

Our pipelines rely on JDBC connections to query online MySQL tables. However, Spark 3’s interaction with the JDBC connector presented unforeseen challenges:

  • Slow Queries and Memory Consumption: Queries became significantly slower and more memory-intensive. Turned out that in Spark 2, we used fetchsize to limit the number of rows returned at once; However, Spark 3 required setting useCursorFetch=true for fetchsize to take effect.
  • Data Type Discrepancies: Spark 2 returned MySQL bigint data as Long (int in Python), while Spark 3 returned it as decimal.Decimal. This type-mismatch broke certain use cases expecting integers. We implemented a customSchema argument to enforce the desired data type.

Changes to the Memory Model

Spark 3 introduced a change in the memory model that impacted our resource allocation. In Spark 2, spark.executor.memoryOverhead included off-heap dataframes. This is no longer the case in Spark 3. Our newly created executor metrics dashboards for the Spark 3 cluster revealed that a significant portion of the allocated memoryOverhead remained unused. We implemented an override to lower this value and add it to spark.executor.memory (automatically within our Spark client itself) to handle this change.

TensorFlow 2 Support

Our move to Graviton machines necessitated an upgrade to TensorFlow 2, as Graviton lacks TensorFlow 1 support. This required migrating some pipelines and adapting to changes in how Spark reads TFRecords. We had to refactor instances of sc.read.format("tfrecords") to sc.read.format("tfrecord") to ensure compatibility with the new TensorFlow version.

Other legacy behaviors

To maintain backward compatibility and address unexpected behavior changes in Spark 3, we implemented several configuration adjustments. These primarily involved turning “ON” legacy configurations and mitigating unforeseen issues. A comprehensive list of these configuration modifications is available in the below image for reference.

Result

Our efforts culminated in the successful migration of all applications to the new Spark3 system, allowing us to decommission the previous one. Despite needing to migrate thousands of applications, our automated tooling & batch rollout strategy facilitated a swift & efficient transition. Additionally, we bid farewell to our small HDFS cluster, streamlining our architecture and reducing maintenance overhead.

The migration unlocked a cascade of benefits:

  • Cost Efficiency: Leveraging Spark 3’s performance enhancements and the cost-effective Graviton instances, we achieved a remarkable ~35% reduction in hardware costs.
  • Enhanced Performance: Even with reduced spending, our pipelines now complete significantly faster, thanks to Spark3’s optimizations and the improved processing power of the new infrastructure. Overall, our daily workloads finish 15–20% faster now.
  • Optimized Resource Utilization: The enhanced visibility into executor memory metrics paved the way for further cost savings. As a result, our team developed another automated tool to fine-tune memory configurations, maximizing efficiency and minimizing waste.

Final thoughts

This upgrade has been a significant milestone for the Data Platform team at Quora. After working with so many Data & ML teams for more than a year on this initiative, we are glad to finally reap the benefits.

This upgrade has opened new avenues for us too. With Spark3’s capabilities to run on K8s, we want to experiment if that could improve our dev velocity by relieving some pains we have with our monolith repo. We also plan to build further upon our recently added visibility to improve our efficiency and reduce costs.

Lastly, we have learned that a “multiple small upgrades” strategy could be way more efficient than the “one huge upgrade all at once” strategy. Hence, we plan to invest in making our future upgrades easier & faster.

--

--