Mastering Spark’s Performance Puzzle: Top 5 Challenges

Hariesh
7 min readOct 27, 2023

--

Introduction

Greetings, fellow data enthusiasts and tech voyagers! As we start the journey through the realms of Spark performance, let me proudly put on my lab coat and introduce myself as your trusty “Data Doctor.” Just like a medical professional diagnoses ailment and provides remedies, I’m here to dissect the challenges that often plague the world of Apache Spark and prescribe some invigorating solutions. (Uff, it’s hard to write a catchy introduction)

In my experience as a Data Doctor, I have came across many Data Patients with different problems like Spark’s sluggish query syndrome, Memory overload syndrome and many more. Don’t worry, I got you! As a Data Doctor, it’s my duty to explain you the root cause of it and provide ways to cure them.

Understanding the performance root causes will help you solve data performance problems and aid you to improve memory management, lower query latency and reduce cost. These are 5 challenges we will talk about.

  • Skew
  • Spill
  • Storage
  • Shuffle
  • Serialization

SKEW:

Let me explain What Data Skew is with simple example. Consider we have 5 equal sized buckets which can hold 5L each. I’m filling each bucket with some amount of water (ref to diagram), but the amount of water filled is not same across the buckets. As per image, totally we need to occupy 11L, which can be done with 3buckets (Data Storage). If I want to transfer the water or fetch them (Data transfer), I need to lift 5buckets as well.

Relating it to Data, Buckets are Partitions and data are water. “Imbalance size of Partitions causes Data Skewness”.

Typically, when data is transformed, there are possibilities of creating more files in one partition than others. It is usual in Spark. If the amount of skew is less, there will not be much of a performance issues. But if its higher, it may lead to data spill and cause memory failure and performance issues.

Impacts

Let’s see how the data skew affect the performance. Assume we have 5 buckets (partitions) and each buckets have different volume of data in it. It is clearly visible as per chart that data is skewed.

Bucket 5 is almost 10X larger than lowest size bucket (Bucket 3). When we do transformation on Bucket column,

· It will take almost 10X time to process.

· 10X RAM to process.

So, the entire stage or task will be paused waiting for slowest task to finish. Meaning the entire stage will be 10X slower. And we may left with no RAM to process skewed partition.

Mitigation:

· Select Proper Partitioning column — While partitioning table select columns which has high cardinality and even distribution. Don’t select Primary key as partition column.

· Salting — Salt the skewed column with random numbers to create even partitions. This might be costliest process as it takes extra processing.

· Data Repartitioning — Use partition functions like repartition() & coalesce() to increase or decrease partitions as required.

· Databricks offers few inbuilt strategies to fix skewness.

o Use skew hint option to employ Skew join optimization (attach sample query)

· Enable Adaptive Query Execution (Spark 3) — This will use runtime statistics and provide optimized query plan.

SPILL:

Simple example to demonstrate data spill — Let’s assume we have a mug full of coffee. We need to serve it to one person. Let’s fill a cup with the mug of coffee we have. While filling, the cup is full, but we still have coffee in the mug. The simple solution is, take another cup and fill the remaining coffee. Once the person drinks enough amount of coffee in first cup, we can refill the first cup with coffee in second cup.

First cup — RAM

Second cup — Disk

Coffee — Data

Spill represents the process of moving data partition from RAM to Disk and again back to RAM. This data movement occurs when a partition is too large to fit on Executor RAM. Data spill is itself a performance tuning step which happens to avoid Out of Memory issues. But too much of data spill will lead to performance problems.

“Writing of temporary files to Disk due to lack of memory”

Identification:

To identify if Data Spill has occurred in the processing, one can refer to Spark UI Stage section.

This will provide the detail on amount of data is spilled to Disk and amount processed on RAM.

Main root cause of Spill would be Data Skewness. If there is data skew, there will be data spill.

Mitigation:

· Allocate more memory to per executors / workers — so that partitions can fit inside one and wont spill.

· Mitigate the data skewness if any.

· Workaround — Decrease the size of partitions by increasing the number of partitions (might lead to small files problem)

· Manage default spark settings

  • spark.sql.shuffle.partitions
  • reparition()
  • spark.sql.files.maxParitionBytes

SHUFFLE:

Shuffle is an essential malice which are caused while using wide transformation (join(), distinct(), groupby()) or some actions. I called it an essential malice, because we need shuffle one way or the other to aggregate or transfer data between partitions. Shuffle is an expensive operation as it involves writing data to disk and network IO.

Following things can trigger shuffle in Spark Application:

· Using Wide Transformation like

  • groupby()
  • reduceByKey(), aggregateByKey()
  • orderby(), sortby()

· Join Operation — join()

· Partitioning — repartition(), coalesce()

· Window Functions

Mitigation:

· Use worker will high memory configuration & limit number of workers — to reduce network IO.

· Limit number of columns & rows when necessary — to reduce amount of data being shuffled.

· Shuffle the data & bucket it (using any column as key) before doing costly join

· Broadcast the table if they are small enough. broadcast(tableName)

· Use Spark’s Cost-Based Optimizer

NOTE: Shuffles are possibility negated by Spark 3 & Adaptive Query Engine features.

STORAGE:

We know amount of data transferred through will have great impact on speed and cost of processing. Number of data files transferred have significant impact on processing as well. Large number of small files is one of the things which needs to be under your microscope when solving Spark performance issues. Reading too many files from cloud storage can lead to I/O bottleneck. As both number of files & size of data have equal importance, thus striking the right balance is quite challenging.

Mitigation:

· Compacting — Compact the files to produce high volume data. Small files are combined to create single or fewer large file.

  • Set the output file size using spark config — spark.databricks.delta.optimize.maxFileSize.

· Use Bin-Packing Optimization (majorly available in databricks) — Apply ZORDER & OPTIMIZE on specific column which produce evenly balanced data files

· Set Table properties to Auto Optimize — delta.autoOptimize.optimizeWrite = true setting this option would compact small files during table writes

Set Auto Compaction to True — spark.databricks.delta.autoCompact .enabled = true setting this true will compact small files after write is successful, this is alternate approach of Auto Optimize if writes are not happening in schedule

SERIALIZATION:

When data needs to be transmitted over a network or from one device to other, there is mechanism called serialization happens. It basically converts the data into a format which is easy to transmit and that can also be reconstructed to original form (Deserialization).

We know that, Spark is built using Scala language which in turn uses JVM as the core concept. When a Spark codes executes, it gets converted to native Scala and then Java Bytecode. The code submitted will be serialized then transferred to executors and then deserialized before execution, to make sure efficient transfer of data between nodes over network.

UDFs needs to be discussed when Serialization is the topic. UDFs written on any language other than Java & Scala needs to be serialized & interpreted locally using Spark in each node. There is a need of communication between JVM & Python while Python UDFs are used which causes performance issues. UDFs are not optimized by Spark’s Catalyst Optimizer but python arrow package can be helpful.

Mitigations:

· First step would be to stay away from using UDFs, Vectorized UDFs or custom transformations.

· Use Higher-order functions & other inbuilt functions Spark & SQL offers.

· If there is still need of UDFs

  • Use Databrick’s Photon Engine Vectorized Python UDFs — which will convert the code to C++ and will avoid the impacts made by normal UDFs
  • Code in Scala or Java instead of Python

Conclusion:

Throughout this journey, we’ve explored various common performance problems and their corresponding solutions, equipping you with the knowledge and tools needed to excel in this role. We saw five major problems all spark developer face and how to mitigate and prevent them. Just as a doctor’s dedication leads to healthier lives, your dedication to Spark performance optimization will lead to more robust, efficient, and impactful data processing, ultimately benefiting your team and your organization as a whole. So, embrace your role as a data doctor, continuously diagnose and treat Spark’s performance ailments, and watch your data infrastructure thrive.

--

--