Apache Spark Optimisation — Part 2
Even though we write our Spark code clean, sometimes performance degrades because of the nature of source data, its distribution, quality, volume, throughput. What are the techniques to tackle these? In this blog, we will discuss the next set of optimisation techniques focused on the incoming data.
Note : Please refer https://medium.com/@dineshkumarkct/apache-spark-optimisation-part-1-c325be5637e0 for the part 1 in optimisations.
Fix the Skew!
Skewness is the uneven distribution of data and it is completely based on the nature of the business and logic that generates it.
If you take an example of YouTube in a geo location and you collect information every second. consider the following cases.
- There are 10 user who watches YouTube round the clock in more than 10 genres in a day.
- There are 1 million users who watch less than 6 hours a day and watch less than 10 genres in a day
Imagine you have to provide a live recommendation based on the usage for all the users in that location. 99% of your data will be from the 10 users.
Following are the problems you will face.
- Most of the work you do and the resources you use to enrich and generate recommendations will go to the top 10 customers.
- If you write an application to read real time usage events from Kafka and produce results for all the users in near real time, you might have a partition based on users. This will make the task for the top 10 users to take 10x of the time to complete whereas the remaining tasks would have completed lot sooner and will be waiting for the slower 10 to complete.
- The overall Job time is anyway equal to the time taken by the slowest task to reach completion.
- Another problem is, suppose you provide 100 executors for 100 tasks — which is for 100 users — 90 executors might have completed the task and the 10 executor will be doing the heavy lifting. You are paying for 90 idle executors also since the stage completes only if all tasks complete.
There are lot of standard techniques to fix the skew. Let me cover few.
- Understand the distribution of data before applying any partitioning in any part of the your data pipeline.
- Salting is a great technique to eliminate the effect of skewness. By Salting, you can add a random value on top of data introducing more partitions within one big chunk.
- Spark 3 has AQE which handles skewness during join by enabling the parameter : spark.conf.set(“spark.sql.adaptive.enabled”,”true”)
Too many small files spoil the performance!
Suppose, if you read data from a data lake or a bucket or a folder containing files, small files issue is more common.
When there are lot of small files, like in thousands or even lakhs in some cases, Spark driver will need lot of time to list the files before it schedules the job on the executors.
So, the job will technically never start and it will be stuck in the listing stage itself.
Ways to solve them are:
- If the upstream system is in your scope of development, try consolidating files into bigger chunks.
- If not, implement a program before the Spark application to combine small files into bigger files — Compaction. Thus the main application will have lesser and big enough partitions to run efficiently.
Memory allocation : Dynamic or Static?
Spark gives you two options in memory allocation.
- You can define your own configuration parameters (number of executors, num of cores per executors, driver memory, executor memory etc … ) at the start of the application.
- You can enable dynamic allocation where the Spark Application can get the necessary resources from the cluster whenever needed and complete the application asap.
When spark.dynamicAllocation.enabled=true?
- If you don’t have a consistent throughput (in case of streaming app) or the size of the input varies time to time (in batch).
- If the SLA is too low and cost is lower in priority compared to the performance and time taken for the app to complete.
- If you are not confident about all the configuration parameters in Spark.
When spark.dynamicAllocation.enabled=false?
- If the incoming data is consistent in throughput and volume.
- If you have a limitation on the resources to be allocated and you are not running a high priority applications and an occasional delay is okay.
- If you are using a shared cluster among other high priority applications.
- If you are aware of the configuration parameters in spark submit.
No random values in configuration
Spark can complete a task faster than any other systems if the proper resources such as memory and CPU are provided.
Improper settings may make the job slower and sometimes it can even die in the middle and never reach completion eventhough you have a big enough cluster to complete the work.
Few of the parameters to be careful about are
- Choose proper values for Number of cores per executor, Number of executors, Exec Memory, Driver Memory. Refer this doc for an example : https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
- Shuffle Parallelism : The default is 200 but we can increase or decrease based on our needs. Increase to a higher values if the data size is in Gigs and 200 partitions might produce very big partitions and you have executors sitting idle. Decrease the same to a lesser number probably 10 or 20 in cases where data is small in size and fewer tasks will be beneficial with the available cluster resources.
- Default Parallelism : The default value is the total number of cores on your system. RDD will be created from a source with number of partitions equal to default parallelism. Adjust this number based on the subsequent stages in the job.
- spark.sql.shuffle.partitions is not spark.default.parallelism!!
- Coalesce and repartitioning : Coalesce and repartition may be used to combine small files at the target side but if it is done in the middle, it will have fewer tasks from that point which may slow down subsequent stages since some executors may sit idle with no tasks.
- Number of target partitions : At the target, choose the coalesce and repartition number wisely. There is no magic number that fits for all. Get your desired size of each partition at the target location which is good for the downstream systems and arrive at the partitions based on the size.
Part 1: https://medium.com/@dineshkumarkct/apache-spark-optimisation-part-1-c325be5637e0
See you on another blog!! Leave a clap if you find this interesting!!