How to optimize your Spark application

Poatek
Poatek
Published in
7 min readDec 9, 2022

In a past series of posts [1], it was discussed about Spark’s dominance in the Big Data world and about how fast it can be. However, it is not unusual to end up with slow applications. Worse than that, sometimes, the process of optimizing an application doesn’t seem to make sense and appears to be a matter of luck. It is not, by the way. And we are here to talk about some things to keep in mind that might help you make your Spark application way faster with just a few steps and understand why they work.

In this article, we are going to present several tips for you to apply in your code to make your application faster, and in the next one, we are going to explore resource calibration, such as the ideal number of executors, cores, and partitions for your application. Each tip is going to be followed by an explanation about the usage and the reason why it works. For every tip, we assumed the use of Spark 3.

Avoid UDFs/Lambdas (or “let Spark do its job”)

As you probably know, at this point, Spark creates an execution plan when running a code. Simply put, Spark analyzes your code and creates a plan to run it the fastest way possible [2]. A user-defined function (UDF) [3] is a function created in the language you are using (Scala or Python, for example) and thus cannot be read and optimized by Spark. It means that every time you use a UDF you are preventing Spark from doing its job, i.e., optimizing this part of the application.

Whenever possible, prefer to use Spark built-in functions [4] rather than UDFs to make your code perform better. Of course, sometimes a UDF cannot be avoided, and in this case, there is no option. Just remember to guarantee that every UDF in your code is really necessary, so you can have a faster job.

Avoid saving unnecessary data

Saving DataFrames throughout the code may seem harmless to the overall performance of the code, but they aren’t. Saving operations are expensive because of the high latency of hard disk operations and the cost of collecting all the data from executors to be stored. Especially when dealing with extremely large datasets, saving operations can be very impactful in the application performance and should be avoided.

Note that, counterintuitively, sometimes it can be better for the performance of your application to recompute a DataFrame instead of saving it. Always be careful and watch your data closely, making sure to assess the size of your DataFrames and the available memory, making the most suitable decision for your data. Fortunately, sometimes you will be able to make use of a better option: persist data. Persisting data instead of storing it directly in the file system might help improve your overall performance, and these operations will make the data ready to be consumed. Spark provides two ways of persisting data through the API calls cache() and persist().

Cache()

Caching is a well-known technique in Computer Science for situations where there is a need for faster data access. This is achieved by using a faster data storage — usually volatile, as the data liveness on said storage will be short — allowing the data to be ready to be consumed by the processing unit, with a much smaller IO overhead. By using the cache() call, Spark will try to keep as many partitions as your executors can in memory to be faster accessed. However, if a partition that couldn’t be persisted in memory is needed, Spark will need to recompute that partition again, often slowing down your Spark job.

Persist()

Persist() is a more versatile way to persist data when compared to cache(), as it offers better control on how the data should be stored. The persist() allows the developer to decide what’s the best storage strategy for the application through the StorageLevel.LEVEL attribute. The table below helps visualize the different storage levels provided by the DataFrame persist() call. Keep in mind that all the data stored on the disk is serialized. More information on serialization can be found in Spark documentation [5].

(Configuration possibilities for persist() [6]

A few examples for when persisting data could help[7]:

  • DataFrame commonly used during iterative machine learning training
  • DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines.

Persisting can help improving performance in some cases; however, it can be unsuitable in cases where:

  • DataFrames are too big to fit in memory
  • The transformation to a DataFrame is not expensive
  • The DataFrame doesn’t require frequent use, regardless of the DataFrame size.

Avoid joins (shuffles)

Joins are extremely important in the context of Big Data, to enrich data by merging information provided by different DataFrames using a set of common keys between both data sources. This kind of operation, however, might be extremely impactful on the whole application’s performance. It might trigger data to be sent to different Spark nodes over the network or store partitions to disk, both expensive operations. Operations that trigger these operations are known as Shuffle.

Given that shuffle operations are expensive, they should be avoided. In most cases, however, they are essential in the code, and it is impossible to escape from using them. In these cases, a small tip is to filter your data as much as possible before the join operation is performed. By doing this, the amount of data sent to different Spark nodes is reduced, and the operation can be computed faster. Also, select only the columns that matter for the operation, avoiding processing and moving unnecessary data.

Fortunately, Spark provides five different strategies for joins — Broadcast Hash Join, Shuffle Hash Join, Shuffle Sort-merge Join, Cartesian Join, Broadcast Nested Loop Join — and the best approach is to analyze your DataFrames to choose the least expensive one. Here we will talk about the most common ones: Broadcast Hash Join and Shuffle Sort Merge Join [8].

Shuffle sort-merge join

This is the standard and most common join strategy found in Spark applications. The sort-merge algorithm orders the datasets over a common key that is sortable, unique, and can be assigned or stored in the same partition. From Spark’s perspective, this makes all the rows within each dataset with the same sorting key to be stored on the same partition on the same executor. It requires sorting and data exchange over executors, which can slow down the application. This exchange can be reduced or even eliminated when adopting some strategies before joining. Here we will explain 2 methods of doing this and show in which cases they can boost your application’s performance.

Bucketing:

This strategy is very convenient when having a dataset that you know beforehand is going to be joined or grouped several times. By applying a “bucketBy” when saving a dataset, you are going to save the data sliced into “n” buckets by the columns you want to. Then, when applying a join or a grouping on the dataset, if you use the keys used on the bucketBy operation, the shuffle is going to be skipped, and the operation is going to be very fast.

In general, if you have only one operation of join or group, this strategy will not boost your code performance: the time spent saving your DataFrame can be higher than applying the join/group operation. On the other hand, if you are applying several operations that require shuffle on your DataFrame, the cost of saving your bucketed DataFrame can save you some time, as the partitions with the same keys will be loaded to the same executors, thanks to the bucket operation.

PartitionBy:

This strategy is applicable on the same conditions as bucketing and can be very advantageous when having a dataset with uniformly distributed keys. The partitioning operation is going to create a partition for each column value you set as the partitionBy key. Remember that, in the same way it happens when bucketing, the partitioning operation is very expensive as it involves saving the DataFrame. Remember to understand your data and use these techniques wisely; otherwise, it might worsen your application’s performance.

Broadcast Hash Join (also known as map-side-only join)

This join is best used when one of the datasets is small enough to fit in the driver and executor memory (10mb by the default configuration), and the second dataset is large and needs to avoid being moved. This join strategy copies the smaller dataset to all the executors, avoiding shuffling the larger dataset, as the join can be done on each executor.

We expect that with this article, we could give you a better comprehension of the optimization process in Spark, and help you to make your application even faster. In the next post, we are going to explore resource calibration of Spark applications, so you can improve performance by tweaking a few Spark settings when running your Jobs. Stay tuned!

[1] Apache Spark and Big Data #2, available at <https://poatek.com/2021/11/03/apache-spark-and-big-data-2/>, accessed in Dec, 12th 2022.

[2] Damji, J.S., Wenig, B., Das, T. and Lee, D., 2020. Learning Spark. O’Reilly Media, Chapter 3. Apache Spark’s Structured APIs

[3] PySpark UDF, available at <https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/>, accessed in Dec, 12th 2022.

[4] Spark SQL Built-in Standard Functions, available at <https://sparkbyexamples.com/spark/spark-sql-functions/>, accessed in Dec, 12th 2022.

[5] Data Serialization, available at <https://spark.apache.org/docs/latest/tuning.html#data-serialization>, accessed in Dec, 12th 2022.

[6] Spark Persistence Storage Levels, available at <https://sparkbyexamples.com/spark/spark-persistence-storage-levels/>, accessed in Dec, 12th 2022.

[7] Damji, J.S., Wenig, B., Das, T. and Lee, D., 2020. Learning Spark. O’Reilly Media, Chapter 7 Optimizing and Tuning Spark Applications.

[8] Spark Join Strategies — How & What?, available at <https://towardsdatascience.com/strategies-of-spark-join-c0e7b4572bcf>, accessed in Dec, 12th 2022.

--

--

Poatek
Poatek
Editor for

We’re a software engineering company filled with the best tech talent!📍Porto Alegre, São Paulo, Miami and Lisbon linktr.ee/poatek.official