Apache spark: optimization techniques
Apache spark is one of the widely used engine for big data processing. The biggest hurdle encountered while working with big data is not accomplishing a task, but accomplishing a task with minimum amount of resources. This is where optimization comes into play, optimization aims at solving big data problems with minimum amount of resources.
Here we will be discussing the following optimization techniques.
- take is better than collect
- Persist whenever possible
- Avoid groupByKey and use reduceBykey
- Coalace is better than re-partition of data
- Use broadcast wisely
Now let us look at each of these methods with an example.
Take is better than collect
The following example df.collect() action applied on a dataset, and the execution details for the same job.
var df = spark.read.option("header",true).csv("/user/test/olympic-summary/Summer-Olympic-medals-1976-to-2008.csv")df.collect()
The duration of execution is .1 seconds. if we replace the collect action with a take, instead of converting the whole dataset into an array it will only take from the first partition, that is it simple and easy.
df.take(15433)
Persistence is the key
All spark transformation like Group By,filter are lazy by nature. This means that spark will not apply these transformation immediately but it will keep on waiting for an action like count,save etc. This lazy execution strategy can sometimes be a problem also let us look at the below example.
var l1=Seq(1,2,3,4)
val rdd1 = sc.parallelize(l1)
val rdd2 = rdd1.map(x=> x*x)
val rdd3 = rdd2.map(x=> x+2)
rdd3.collect()
rdd3.count()
All the transformation applied for collect action are again applied for the count action this can be a huge problem in large operation.
import org.apache.spark.storage.StorageLevel._
val dfPersist = rdd2.persist(MEMORY_ONLY)
val rdd4 = rdd3.persist(MEMORY_ONLY)
rdd4.count()
rdd4.collect()
The solution is storing transformation in memory or disk storage can be memory,disk_only and memory_and_disk.
reduceByKey is better than groupBykey
The example given below splits the sentence by space and count the number of occurrence of words.
val text_list = Seq("this is a sample sentence", "this is another sample sentence", "sample for a sample test")
val rdd = sc.parallelize(text_list)
val rdd_word = rdd.flatMap(x=> x.split(" "))
val rdd_pair = rdd_word.map(x=> (x, 1)
val wordCount = rdd_pair.groupBy(w => w).mapValues(_.size)
wordCount.collect()
problem with this approach is that in group by number of shuffles will be higher. So instead of group by if we use reduce by key it will initially reduce within the partition, so effectively number of shuffles will be lesser.
val reducedata = rdd_pair.reduceByKey(_+_)
Coalace is better than re-partition
Re-partition is used to increase the number of partition, but Re-partition will cause a large data movement across the cluster. Coalace is a better approach. Coalace algorithm internally works in a way to reduce the number of data transfer.
Broadcast variables
Checkout my article on spark joining strategies to see how broadcast joins can be used effectively.
Similarly we can also broadcast variables into different nodes, so these variables will be cached inside each nodes. Look up tables used frequently can be broadcasted similarly to improve efficiency.
val country = Seq(("Ind"->"India"),("US"->"USA"),("UK"->"United Kingdom"))
val broadcaster = sc.broadcast(country)
In the above example country look-up table which transforms country code into country names are broadcasted into each nodes.