Spark Performance Tuning

Rupesh Malkar
Analytics Vidhya
Published in
7 min readMar 14, 2020

Apache Spark, the unified analytic engine, has seen a rapid adoption by enterprise across a wide range of industries.

A power to process the data in-memory makes it 10times faster than processing the data using MapReduce. However, if the spark job is not properly tuned could result into job failures.

Before we move in to spark tuning lets recap and look at its architecture.

Spark Architecture

Spark has a driver program and many worker nodes which has executor to run a task. The input data is divided into partitions and each partition acts as one task which is executed by an executor.

Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory.

There are variety of techniques to optimise spark jobs:

  • Memory Tuning/Garbage Collection Tuning
  • Data Serialization
  • Cluster configuration
  • Level of Parallelism
  • Broadcasting
  • Data at Rest
  • Data Locality

Memory Problem

There are three consideration in tuning memory issue:

  • The amount of memory used by an object
  • The cost of accessing those objects
  • The overhead of garbage collections

By default java objects are fast to access, but they easily consume 2–5x more space than raw data.

Each distinct Java object has an “object header”, which is about 16 bytes and contains information such as a pointer to its class. For an object with very little data in it (say one Int field), this can be bigger than the data.

Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an array of Chars and keep extra data such as the length), and store each character as two bytes due to String’s internal usage of UTF-16 encoding. Thus a 10-character string can easily consume 60 bytes.

Common collection classes, such as HashMap and LinkedList, use linked data structures, where there is a “wrapper” object for each entry (e.g. Map.Entry). This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list.

Memory Management Overview

  • Memory usage in Spark mostly falls under two categories: Execution and Storage. Execution memory used for computation like shuffles, join, aggregation, sort. Storage memory is used for caching.
  • In Spark, execution share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa
  • Execution memory can evict storage memory
  • There is a sub region (R) within (M) where execution memory cannot evict storage memory
  • Storage memory cannot evict execution memory
  • spark.memory.fraction express the size of (M) as fraction of JVM Heap (default 0.6)
  • Spark.memory.Storagefraction express the size (R) as a fraction of M (default 0.5) is the storage space within M where cached blocks are immune to being evicted by execution
Spark Memory Management
  • If in your project there is very less caching used, consider decreasing the value of spark.memory.Storagefraction

Tuning Data Structures

The way to reduce memory consumption is to avoid Java features that add overhead, such as pointer based data structures and wrapper objects.

  • Design your data structure to prefer primitive times instead of collection classes like HashMap
  • Consider using numeric IDs instead of string for keys
  • If you have less than 32GB of RAM, set JVM flag –xx:+UseCompressedOops to make pointer be four bytes instead of eight. You can add these options in spark-env.sh
  • When objects are still to large, a much simple way to reduce memory storage is store them in serialized form, such as MEMORY_ONLY_SER
  • The only downside of storing the object in serialized form is slower access time, due to deserialize each object

Data Serialization

Serialization plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation

Spark provides two serialization libraries:

  • Java Serialization: By default, Spark serializes objects using Java’s ObjetOutputStream framework, and can work with any class you create that implements java.io.Serializable. You can also control the performance of your serialization more closely by extending java.io.Externalizable. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes
  • Kryo Serialization: Spark can also use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
  • You can switch to Kryo by initializing your job with a Spark Conf and calling conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
  • It configures serializer used not only for shuffling data across worker node but also for serializing RDDs to disk
  • To register your own class with Kryo, use registerKryoClasses method

Garbage Collection Tuning

Garbage collection
  • If full GC is invoked multiple times before a task completes, it means there isn’t enough memory available to execute task
  • If there are two many minor GC and less major GCs, allocating more Eden memory would help. If the size of Eden memory is E then we can set the size of young generation as –Xmn=(4/3 * E)
  • If OldGen is close to being full, reduce the amount of memory used for caching by lowering spark.memory.fraction. Also, consider decreasing the size of young generation
  • If our task is reading the data from HDFS, the amount of memory used by the task can be estimated by the HDFS block size. Usually the size of decompressed block is often 2 to 3 times the size of block. So to have 3 or 4 task worth of working space, and HDFS block size of 128MB, we can estimate Eden space to be (3*4*128 MB)

Level of Parallelism

  • Cluster is not fully utilized unless we set the partitions high enough
  • We can pass number of partition as second argument or set in the config property spark.default.parallelism
  • It is recommended to use 2–3 task per CPU in our cluster

Cluster Configuration

Scheduling: We can set spark.scheduler.mode to FAIR to allow better sharing of resources across multiple users. Setting –max-executer-core which specifies maximum number of executer cores that our application will need. This ensures that our application doesn’t take up all the resources in the cluster

Dynamin Allocation: Spark provides mechanism to dynamically adjust the resources your application occupies based on the workload. This means that our application can give back the resources to the cluster when no longer used, and request them later again when in demand. This is useful when there are multiple applications sharing cluster.

Broadcasting Large Variables

  • Broadcast variable allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with task
  • To give a large input data set to each node efficiently

Data At Rest

  • Making sure that we are storing the data for effective reads is absolutely essential
  • This involves in choosing our storage system, choosing data format

File-base long-term data Storage

  • There are different types of file format available like CSV, Apache Parquet
  • Although CSV is well structured, they are very slow to parse
  • We can choose Parquet as more efficient file format
  • Parquet stores data in binary file with column oriented storage
  • It makes possible to skip the data which is not required in the query

Splittable files types and compression

  • File format should be spilttable, so different task can read different part of file in parallel
  • Splittability comes in compression formats
  • Compression formats like ZIP and TAR are not splittable
  • File formats like GZIP, BZIP2 is splittable

Table Partitioning

  • Table partitioning refers to storing files in separate directories based on key
  • Partitioning our data allows spark to skip the irrelevant files based on the key

Bucketing

  • We can use bucketing when the data is skew in some partitions
  • If join are frequently performed on the column after read, bucketing the data can improve the performance and stability

The number of Files

  • Avoid lots of small files
  • While reading the data from HDFS with block size of 128MB, 30 files of 5MB each would request 30blocks, even though the same data can be fit in 2blocks (150 MB total)
  • Lots of small file will increase the network and scheduling overhead

Data Locality

If the data and code that operates on it are together then computation tends to be faster.Its faster to ship serialized code from place to place than a chunk of data because of the size

There are several levels of data locality:

  • PROCESS_LOCAL data is in the same JVM as the running code
  • NODE_LOCAL data is in the same node
  • RACK_LOCAL data is on the same rack of the server
  • ANY data is elsewhere on the network and not on the same rack

--

--