Performance Optimization of Apache Spark Jobs

Riccardo Bove
xgeeks
Published in
10 min readAug 12, 2021

This article was co-written by Riccardo Bove, Yoann Dupe, Maher Deeb, and Swapnil Udgirkar, from the KI performance team, part of KI group. Our guest writers for this article!

Photo by Jakub Skafiriak on Unsplash

Business and data customers (users) often have to change their requirements based on fluctuating business needs. Therefore, with regards to the ETL pipeline, engineers may add or remove features and modify or develop new ETL jobs accordingly. Keeping that in mind, optimizing ETL jobs to handle those changes on the big data level is a discipline that every data engineer should learn. The optimization process includes data-level adjustments, e.g., file format, partitioning size, etc., cluster-level adjustments, e.g., memory configuration, number of nodes, etc., and code-level adjustments, e.g., avoiding expensive operations. The trade-off between the cost, the queued jobs, and execution time provides a simple but efficient baseline to tune those ETL jobs. This article will go through some Spark configurations that can make a difference and help you avoid performance issues.

File format and data schema:

Choosing the file format to store data can have a long-term effect on the data pipeline performance. The challenge appears when the engineers have to deal with dramatic changes on the data level, such as adding or removing fields from the data model or changing a field type. Such changes compel engineers to adjust their code. The file format plays a vital role in making the problem complicated or straightforward when backward and forward compatibility is required to ensure that the system works for the data at any point in time. Spark supports reading data from different file formats. Here we walk through the properties of some of those file formats and their respective pros and cons.

JSON:

The JavaScript Object Notation (JSON) file format is widely used and widely supported, especially when it comes to web services. It is a textual format and human-readable. In addition, it has the advantage of distinguishing between numbers and strings and it supports the Unicode character strings. The JSON file format does have some disadvantages. For example, the precision of floating-point can’t be specified. That makes dealing with such numbers a challenge when it comes to maintaining accuracy. The same problem is encountered when dealing with extremely large numeric values. Moreover, JSON also doesn’t support binary strings by default. Getting around that limitation may increase the data size by 33% [reference].

The JSON format has two representational formats: single-line mode and multiline mode. Both modes are supported in Spark. Each line denotes a single JSON object in single-line mode, whereas in multiline mode, the entire multiline object constitutes a single JSON object.

CSV:

The Comma-Separated Values file format is popular language-independent and has a textual form as well. The CSV format is good enough for many purposes. A comma is the default separator or delimiter in CSV files, but you may use other delimiters to separate fields in cases where commas are part of the data. Popular spreadsheets can generate CSV files, so it’s a popular software among data and business analysts.

CSV has the same disadvantage as the JSON format. Moreover, a number and a string containing digits cannot be distinguished in a CSV unless the external framework that reads data can infer the schema. CSV is also a pretty vague format since having the separator character in the actual data without escaping it can make the data invalid.

Parquet:

Parquet is a columnar storage format that supports a document data model. It’s the default data source in Spark. It is supported and widely used by many big data processing frameworks and platforms. Using Parquet format helps to save space and allow access to data columns. Therefore, It is a good idea to store data in Parquet format when we have a lot of columns. The Parquet files store the schema as a part of the metadata. Therefore, you don’t need a separate job to infer the schema. The compression method that the Parquet file uses to reduce the size of the data makes Parquet files not easily human-readable.

Avro:

Apache Avro is a binary encoding format. It was started in 2009 as a sub-project of Hadoop. Avro also uses a schema to specify the structure of the data being encoded. It has two schema languages: one (Avro IDL) intended for human editing, and one (based on JSON) that is more easily machine-readable. It offers many benefits, including direct mapping to JSON, speed, efficiency, and bindings available for many programming languages. When using the Avro format, it is essential to be careful about the schema. Any mismatch between the reader and writer schema will make the encoded data invalid. However, that doesn’t mean that the schema should be identical, but they should be compatible. It is acceptable to have the columns in a different order, for example. When using the Avro format to store a large amount of data in a single file, writing the schema ones as metadata would be enough. In the case of streaming data, the schema can be attached to every data batch. In that case, we can version the schema and store it in a database. Similar to the Parquet format, the encoding method used in the case of Avro format makes data not easily human-readable.

Data schema management:

Maintaining schema compatibility is essential when it comes to long-term ETL job optimization. When using textual data format such as JSON or CSV, having a kind of database for schema versions that the ETL jobs can use to encode the data is very helpful to ensure the compatibility of the data at any point in time. It is better to use an encoded data format such as Avro to avoid manually maintaining such a database. Encoded data format provides applying validation rules to ensure that the schema is up to date.

Partitioning

Partitioning allows for efficient parallelism. An appropriate partitioning strategy allows for spark executors to process data close to them and minimizes network bandwidth. Spark will schedule a thread per task per core at best, and each task will process a distinct partition. The partition size and the shuffle partitions are two approaches that you can use to partition the data efficiently.

Partition Size
By default, Spark partitions data on sizes of around 64Mb and 128Mb. Decreasing the size may result in the “small file problem,” which is when you have many small partitions that result in too many reads from the disk. You can also specify the number of partitions instead of using a specific partition size. That is useful when you have huge input files and partition them to the number of cores. Unfortunately, there is no perfect formula to partition the data. A lot of it is trial and error, so some experimentation is encouraged. You can set the max amount of partition size in the spark configuration using the following command:

spark.sql.files.maxPartitionBytes

And the number of partitions in a data frame is as follows:

df.repartition(num) - or in the case that you are reducing the number of partitions: df.coalesce(num) where the df refers to the Spark dataframe and num is the number of the partitions. You can also define the default number of partitions by setting the following spark configuration:

spark.sparkContext.defaultParallelism

Shuffle partitions

Shuffle partitions are created during the shuffle stage (called by join() or groupBy() which have wide dependencies — check the next section). These are set by default to 200, which is too high for smaller operations, in these scenarios, it’s recommended to reduce them to the number of cores in the machine. You can set the number of shuffle partitions in the spark configuration using the following command:

spark.sql.shuffle.partitions

Matching partitioning strategy in database

In the case of interacting with a database, another approach is to partition the Spark data frame with the same strategy as the database, this will result in each executor communicating with its corresponding shard in the database. For example, if the database is partitioned by country, It might a good idea to use the repartition function to pass the country column like so:

df.repartition(country_column)

Ultimately, the most optimal configuration for the ETL will be dependent on multiple factors like the size of the data, the spark cluster configuration, the transformations that need to be done in the ETL, etc. There is not one single formula for all ETLs, but with these tools, you can experiment with the ETL and determine what is the best configuration for the use case.

Optimizing Spark operations

On the code level, besides optimizing the data partitioning, it is important to utilize the spark operations wisely. The wrong arrangement of those operations can cause fatal problems, such as the out-of-memory (OOM) exception when dealing with huge data sizes.

Narrow and wide transformations

Transformations are operations that Spark evaluates lazily. That is, their results are not computed immediately, but they are recorded or remembered as a lineage. A recorded lineage allows Spark, later in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for more efficient execution. Transformations can be classified as having either narrow dependencies or wide dependencies. Any transformation where a single output partition can be computed from a single input partition is a narrow transformation. filter() and contains() represent narrow transformations because they can operate on a single partition and produce the resulting output partition without any exchange of data. However, groupBy() or orderBy() instruct Spark to perform wide transformations, where data from other partitions are read in, combined, and written to disk. These are often referred to as a shuffle where Spark will exchange partitions across the cluster. Shuffle optimization remains an important topic of discussion within the Spark community.

Optimizing Wide transformations

Optimization can be done by either joining or pipelining some operations and assigning them to a stage, or breaking them into stages by determining which operations require a shuffle or exchange of data across clusters. Partitioning is a process where the data is split into multiple chunks based on a particular field or we can just specify the number of partitions. So, if groupBy is being performed multiple times on the same field, the data can be partitioned by that field and have subsequent groupBy transformations use that data.

Jobs monitoring using Spark UI

Since the ETL jobs optimization is a process, monitoring that process is essential to identify problems and the need for optimization early enough to avoid the time and money bleed. For this purpose, Spark UI is a good starting point. Within the Spark UI, on the job tab, the event timeline is the first thing to look at. It displays in chronological order of the events related to the executors (added, removed) and the jobs. Having a look at the duration will give you an indication of the job performance. If a job takes too much time to finish than others, it is worth optimizing.

On the stage tab, you can look at the detail of each job’s stage per status within an overview table. Note that the number of running tasks is the direct count of cores in the cluster. Therefore, in an ideal spark job, you should have 1 stage inside the job and every task in the job should be running in parallel. Of course, in practice, you’ll find more than one task per stage.

When looking at the detail for a specific stage you can see if some tasks are taking much longer than others especially looking at the executor computing time. If the executor’s computing time has a very high ratio for a task it might suggest that we have some artifacts of a small dataset on a local mode Spark, but it also suggests some skew in one of the partitions or tasks. The tiny tasks suggest near-empty partitions or near-no-op tasks. On the other hand, we might have too many partitions, leading to too many tasks with small computing time in green (less than 60%-70%).

The summary metric table is a good place to see how the data is distributed among the partitions. Looking at the input size row will give you an idea of how you’re utilizing the different core for the job. All percentiles (0, 25, 50, 75, and max columns) should show similar sizes. If one size is too low compare to the other, you’re under-utilizing the core. And if one is too high, you’re therefore over-utilizing the cores. For example, if the Max metric figure is much higher than the 75% figure, it suggests that too many slow tasks or operating over partitions with larger skewed amounts of data.

Summary:

The optimization of Spark ETL jobs is a process that combines decisions and actions on several levels — the data level, the code level, and the cluster level. In the long term, choosing the appropriate file format and the schema management system for storing the data can make the optimization process efficient and straightforward. Optimizing the data partitioning and Spark operations is extremely important to avoid bad performance and fatal problems at the code level. Monitoring and understanding the performance of the Spark jobs using Spark UI is beneficial when configuring the Spark cluster and validating all optimization decisions taken on the data and code level. Nevertheless, the optimization process is much complex than just adjusting some numbers. Therefore, systematic trial and error and well-designed logging are very useful for optimization efficiency.

References

https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/

https://medium.com/@dvcanton/wide-and-narrow-dependencies-in-apache-spark-21acf2faf031

https://www.oreilly.com/library/view/designing-data-intensive-applications/9781491903063/

--

--