CodeX
Published in

CodeX

Apache Spark Optimization Techniques and Tuning

Introduction

As we all know that data is the new oil. Data is growing exponentially; data analysis and customer predictions methodologies have been changing over time and now some of the technologies have become obsolete and some are going to. Most organizations are moving towards microservices and big data handling and processing mechanism.

Architecture is evolving towards fast and reliable technologies and tools. Before starting optimization techniques and Spark Architecture let's understand what is big data and How Apache spark is related to big data.

Big Data

The collection of a huge amount of data that cannot be stored and handled by traditional tools and technologies is known as Big Data.

The first step to solve this problem was taken by Google. They released a framework in 2006 which was named Hadoop and was later donated to the Apache foundation. The framework uses a map-reduce programming paradigm and is able to store and process large amounts of data (HDFS) and (Hive, Pig, etc) respectively. But, since Hadoop runs on commodity hardware, most processing is done on storage level which ultimately increases the read-write operation, resulting in more processing time. This was later solved by Apache Spark.

Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing. You can think of it as a processing engine that will process your data(small or big) faster as compared to Hadoop.

What makes Apache Spark Faster

1.In-memory Computation

Spark stores most of the things in RAM which reduces disk IO. The similar kind of processing used to take more time in other technologies.

2.Lazy Evaluation

Spark creates a DAG(Directed acyclic Graph) of RDD once a statement is executed. The processing starts only when an action statement is executed. We will see this in spark architecture.

Let's take an example where Lazy Evaluation can impact execution time and processing time.

Suppose we have a table containing exam scores and we need to filter out the student who has marks of more than 90% and less than 91%(student base is approx 10 Million).

Then we write a code like this.

df = spark.read.parquet(“some location”)
df= df.filter((col(marks)>90) & (col(marks)<91))
df.show()

A traditional tool first tries to read all the data, stores it in some location and then filters it out according to the condition and displays data, while Spark will take the records that satisfy the above condition and then start the other operation.

Apache Spark Architecture (In simple language)

Spark is based on the master-slave architecture in which there will be one driver program on the master node which is connected to the cluster manager and the cluster manager distributes tasks to workers. Next, the execution of the task is completed and the worker sends the result to the master node.

Benefits and Limitations

Though there are many benefits, there are still a few limitations of Spark. Let's see them one by one.

Benefits

  1. In-Memory Computation- that means less IO, less execution and processing time.
  2. Fault Tolerance- The basic data structure is RDD(resilient distributive Dataset). Once a program is submitted to the driver, the DAG scheduler schedules the jobs and tasks on the worker. If at some point, any RDD transformation/operation fails, then the scheduler checks the DAG and it can reprocess the failed RDD again.
  3. Real-Time Stream Processing- Using Spark Streaming, we can process streaming data with ease. After the data frame addition to Spark streaming, it has become great to use.
  4. Multi-Language Support- We can write spark code in Python, Java, Scala, and R.
  5. Compatible with Hadoop and various Object stores.

Limitations

  1. No File Management System- There is no file system that Spark has provided and we need to store data in the hdfs/s3/local file system.
  2. Real-Time Data Processing- Although Spark has provided a steaming feature, internally it converts data into partition and partitions into micro-batches and processes each micro-batch at a time. So no real-time processing.
  3. Expensive - Since Primary storage(RAM) is always costlier than secondary storage(hard disk) and Spark does most of the operation in Primary memory, so this is an expensive tool.
  4. Out of Memory Issues- Sometimes, your data consumes more memory than the RAM available, in this case Out of memory issue is raised.

Despite of these few issues, there are a few techniques by which we can reduce processing time and memory issues. I will try to list some in the blog which I know and implemented.

Optimization Techniques

  1. Data filtering as early as possible: This is the most effective and simplest technique we can apply to our data processing. There can be two types of filtering that we can apply.
    a. Column level filtering- selecting only those column(s) that are needed for further processing and execution.
    Example: df.select (col(“col1”),col(“col2”),col(“col3”),col(“col4”))
    b. Row-level Filtering: Filtering out data that are not needed for further processing.
    Example- df.filter(some condition)
    When you filter out data as early as possible, then processing data effectively takes lesser time, which ultimately leads to lesser processing time.
  2. File format selection: File format plays an important role in processing time. Since reading time and writing time are also counted in processing, we need to keep those pointers in mind while selecting a file format.
    I. Parquet- Parquet is a columnar format. The benefits of having Parquet as a file format are — It consumes less space: Parquet storage gives better-summarized data and follows type-specific encoding. Parquet supports compression-like snappy, gzip, lzo. Limitation of Parquet: When we try to read a record from Parquet, it is an expensive operation since this is a columnar file format. Writing a Parquet file takes a little bit more time as compared to other binary file formats like Avro.
    II. Avro — Apache Avro is a language-neutral data serialization system. Avro has a schema-based system. Avro has two-components: a)Binary data: data is stored in binary format and serialized the data with Avro Schema. b)Avro schema is a json formatted string/file that is used to serialize/deserialized the data.
    III. CSV/TSV/Delimited file — The data is stored in clear text and tabular format. Advantage- This file format is human-readable which may contain column information in the header. Data parsing is quite easy as CSV can be treated as a 2D array. Limitation- Need more storage to store data(highest). Implementation of structure and array data type might be slightly a cumbersome process, as there is a poor support for special characters, there is no support for column types and there is no difference between text and numeric columns.
    IV.JSON/XML- These are semi-structured data file types represented in key-value pattern. JSON is more widely used nowadays since it uses less memory as compared to XML. Advantage- JSON supports complex data structure. Handling data in JSON is quite simple since JSON Parser is available in all major programming languages.
  3. API Selection- Spark has provided three types of API to work upon data i.e RDD(Resilient distributive dataset), DataFrame and DataSet. For the best performance, we should use the above three API by observing the use cases.
    1. RDD - RDD is the basic data structure on which Spark works. It will be useful when the use case demands low-level computations and operations like text extraction. There will be no optimization provided by default in Spark, hence we have to apply logic very precisely and optimize the code on our own. The filtering of data here is very important at an early stage.
    2.DataFrame - You can think of a data frame as a SQL table or 2 dimensions array capable of storing complex data type like structure/array. It is the best choice for use cases in which we have to deal with tabular data. DataFrame uses a catalyst Optimizer that creates a query plan and has a process for optimization that is Analysis -> Logic Optimization Plan ->Physical plan ->Code generation (as shown in the below diagram).
    3.Dataset - Datasets are highly type-safe and use the encoder as a part of their serialization. They also use Tungsten for the serializer in binary format. Since the dataset is typesafe you need to define a schema before using the data. The main advantages of using a dataset are compile-time error analysis and data type fixing so that, at the time of data reading/ transformation, you’d not face issues with the datatype. As compared to the data frame, it requires less memory. So use cases in which we have to stick to a schema and do not want to generate schema at the time of reading, we should go for datasets.
Catalyst Optimizer

5.Use of advance variables- Spark provides two advance variables: accumulator and broadcast variable.
Accumulators are variables that are only “added” to Spark through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters(as counters in Hadoop).
The broadcast variable is the variable that is stored in every node in the cluster at the time of job/program execution. It saves a lot of time while dealing with large and small datasest simultaneously. It sends the small datasets to all the nodes so the jons take less time at the time of execution jobs.

6. Parallelism using Coalesce/Repartition- Parallelism is the core of data processing in big data. When you create a code in console/submit on the Spark, it creates an operator graph. When we call on action, this operator graph is submitted to the DAG scheduler. The DAG scheduler divides the operator into various task stages. A stage contains a task based on the number of data partitions. These stages will be passed on to the task scheduler. Through cluster manager, these tasks are executed on worker nodes(on executors). Playing with partition is a Two-edged sword because when we increase the number of partitions in our code, the Parallelism at the executor level increases but at the same time, the executor passes on its results of execution to the driver node. The driver node then needs to combine all the results which again takes time, which can impact the overall timing of execution. So play with Parallelism carefully.

7.Data Serialization- Serialization helps in converting objects into streams of bytes and vice versa. When we work on any type of computation, our data gets converted into bytes and gets transferred over the network. If data transferred across the network is less, the time taken for the job execution would be less. Spark has provided two types of Serialization.

  1. Java Serialization- Object are serialized in spark by ObjectOutputStream. The performance of Serialization controlled by java.io.Externalizable which is flexible, but very slow. Instead, we can use Kyro Serialization.
  2. Kyro Serialization- Kryo is a fast and efficient binary object graph serialization framework for Java. It uses direct bytecode-level access for the fields. For using Kyro, first we need to register a class. Or else, it will select from its 50+ default classes. If it still does not find any class there, then it will choose fieldserializer. Kyro Serialization is faster than that of Java Serialization. You can read about Serialization here.
    To register a class-
    .registerKryoClasses( Array(classOf[employee], classOf[class]) )
    while defining spark session we need to set Serialization to kyro
    spark_session = SparkSession\
    .builder\
    .config(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)\
    .config(“spark.kryo.registrationRequired”, “false”)\
    .appName(“appname”)\
    .master(‘yarn’)\
    .getOrCreate()

8. Caching and Parsistance — As we know spark has a lazy evaluation i.e. it will not start processing the data until an action is called. Every time an action is called, your RDD dag is called. Every time this happens, all the processes would be called in and the data would be processed from start. Let’s take an example

We have 2 DataFrames: DF1 and DF2 and by joining them we get the result of DF12 and after that we need to join DF12 with DF3 and get the result dataframe DF123.
So if we run the below code:

df12=df1.join(df2,on=[df1[“somekey”]==df2[“somekey”],how=”inner”)
df12.write.parquet(“some_location_1”)
df123=df12.join(df3,on=[“somekey”],how=”inner”)
df12.write.parquet(“some_location_2”)

So in this case, first df12 action is called, line 2 and df2 will be saved at some_location_1 and once you call df123 action and line 4, again spark will read df1,df2 and will join this dataframe and calculate which is already calculated. So, to save time, we can save df12 on memory/disk by using cache/persist. So, the above program can be written like this.

df12=df1.join(df2,on=[df1[“somekey”]==df2[“somekey”],how=”inner”)
df12.cache()
df12.write.parquet(“some_location_1”)
df123=df12.join(df3,on=[“somekey”],how=”inner”)
df12.write.parquet(“some_location_2”)

When NOT to use Cache/Persist- When the size of the data is large and there are multiple dfs available for cache.

9.Reduce Shuffle Operation- Shuffle is the costliest operation in Spark because it moves the data in network as well as on disk. So lesser the shuffle, the lesser the time taken for your operational execution.
Since nowadays we mostly use df, I will not suggest you to use reduceByKey or groupByKey. We shalllearn how to do that in a while.
By setting sparkSession.conf.set("spark.sql.shuffle.partitions",x),
we can enable shuffle partitions.
Ideally, one task per partition should be of 100–200MB.

Formula for best result is
spark.sql.shuffle.partitions = (shuffle stage input size/target size)/total cores) * total cores.

In Spark 3.x, we have a newly added feature of adaptive query Execution. When spark.sql.adaptive.enabled settled as true and spark.sql.adaptive.coalescePartitions.enabled settled as true, then the number of shuffle partitions can be updated by spark dynamically. For better results, you can play around spark.sql.adaptive.advisoryPartitionSizeInBytes which is 64mb by default in size.

10. Setting up limit for Broadcast Join — When we broadcast a table in Spark, the table would be circulated to all the workers and the execution time will be less but there is a by default value set to 10MB. We can increase it by setting up spark.sql.autoBroadcastJoinThreshold.

11. Compressing data while SQL execution- While this will not drastically decrease execution time, it will: a) Prevent out of memory issue and b) Compress data at the time of execution. This can be done easily by just setting up one parameterspark.sql.inMemoryColumnarStorage.compressedto True.

12. Memory and Resource Allocation- When we are running a Spark job on the cluster or local, the most important thing is to allocate only the required amount of memory and cores to the application. If not done correctly, then you would not be able to run jobs in parallel and most of the jobs would fail. So back to basics.

Spark has driver, executors and we(our spark cluster; may be standalone) have cores and memory for both the workers and driver. Let’s take an example to understand the maths involved and then we will set the properties according to that.
Suppose we have 128 GB memory and 32 cores for driver(master node) and 64GB memory and 16 cores for 4 workers. So, the total memory would be 384GB (4*64 +1*128) and 96 cores in total (16*4 + 1*32).

Now if we want to run a job of 200GB. Assuming data calculation would have some group by and count operation, so driver will also need some memory.

number of executors per node=4
Memory per executor=15GB
cores per executor=3
total number of executor= 15
driver memory=20GB

Now let’s see the calculation.
number of executor per node will be =15/4 ≈ 4 that means on 3 workers 4 executor will be spawned and on the4th node 3 executor will be spawned.
Total memory allocated will be = total number of executor s* Memory per executor= 15*15= 225GB which is greater than data size (overhead and intermediate output should have some memory allocated).

Let’s see the Spark Submit command. I am going to use pyspark submit.

spark-submit — master local — conf “spark.executor.cores=45” — conf “spark.cores.max=3” — executor-memory 15g — driver-memory 20g — py-files “some_py_files” “python_file_to_be_executed.py”

Conclusion

Through this article, we saw how Spark is helpful for processing data. We also saw the limitation of spark and then we learmt about how to optimize execution time and space management. I hope that this would be helpful for you while working on Spark. Please do let me know if I need to add more points. Constructive feedback is always appreciated!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store