How DBS’ finance platform team optimises Spark jobs for better performance

kgshukla
DBS Tech Blog
Published in
9 min readJul 1, 2022

In the first part of a two-part series, we look beyond code optimisations to solve big data analytics performance with five short-term solutions

By Kapil Shukla

Working in analytics and big data domains means one thing: it would not be long before you need to optimise the performance of a Spark job while crunching data at scale. Apache Spark is a well-known distributed processing framework that processes large data sets across cluster of nodes.

Common performance issues include Spark jobs not being successfully completed due to memory issues, taking a long time for jobs to complete when it worked fine a month ago, or incurring large i/o due to data movement from memory to disk and vice versa.

There are numerous underlying drivers for these problems. Some of the more common drivers include:

1) Poorly written code: Developers who are not trained in distributed systems have a higher chance of churning non-optimised code;

2) Loose engineering: Not looking from an end-to-end perspective, or not prioritising non-functional requirements such as automated data quality checks, and dynamic executors assignment, and the lack of performance regressions testing;

3) Lack of Observability: It is not possible to improve what is not observed. Presenting facts such as job run times, infra usage, and executors assignments are done by teams, but what is often forgotten is the observation and analysis of underlying indicators that may dictate the processing performance; and

4) Team Structure: This is one of the most underrated reasons. While one may usually look at the three drivers stated above, it’s crucial to understand whether said drivers could have occurred due to the way the team has been structured to function

Performance issues can be solved tactically, particularly quantified as quick wins or low hanging fruits in enterprises. This article focuses on five short-term solutions that work in multiple scenarios and directly contribute to the technical solutioning for optimising performance. A second article will focus on team structuring and observability. While neither directly contribute to the technical solutioning, they are nevertheless important practices that teams should follow to minimise performance issues.

1) Improve Directed Acyclic Graph (DAG) Processing

Spark works on lazy evaluation basis, which means until an action, such as count() or show() gets called on a dataset, Spark will maintain all transformations done on the dataset, but not evaluate them. Once an action is called, Spark figures out the best plan — internally referred to as DAG — to process computations in a distributed manner.

Whenever an action is called on the dataset, Spark starts processing the DAG. If a lot of transformation has been done in the code, creation of this DAG would take time. It may seem that nothing is happening or changing on Spark’s UI. It may seem… stuck. In reality, Spark is creating the DAG.

Secondly, Spark does not clear DAG history when the action has been completed. It maintains DAG history to provide resilience against failure of any kind — including crashing of executors or loss of data partitions. DAG gives the ability to recompute data in these scenarios. If Spark does not clear the DAG created until that action gets called, the overall DAG will keep growing, and Spark needs to maintain the growing DAG until the job gets terminated. This DAG, which is now larger than it needs to be, will extend the processing time.

In such cases where the business logic is too long and complex, many developers would persist the data in an intermediatory table at some point in the code. They will then continue to read the data again from the intermediatory table and continue with further transformations. Unfortunately, data persistence does not let Spark clear DAG history. This slows Spark performance down, and it can sometimes take as long as 20 minutes. Instead of writing the data in an intermediatory table, I strongly suggest persisting the data and the schema via checkpointing in Spark. Once checkpointing is complete, DAG is truncated until the checkpoint, with Spark maintaining less metadata going forward.

2) Dynamically Assign Job Parameters

Three important parameters: number of executors, cores per executor, and shuffle partitions, need to be assigned during the Spark job submission process. When a new project starts, the value for each parameter is established based on the dataset used during the testing process. Good software engineering teams will go extra mile to build observability features to measure performance of these Spark jobs in the production environment, and take corrective action if they find inconsistencies, instead of just focusing on delivering the next business feature.

The challenge comes when the job runs with the same static parameters in production month after month, when underlying data being processed has grown significantly, or when more logic (transformations and actions) has been added to the code. Production support teams that are proactive in building mechanisms to track job performance would catch this deviation in processing time much earlier than others that don’t. But many teams may not care to observe these key metrics — they are either lacking capabilities to build observability features, or too busy rolling out business requirements.

To solve this issue, it’s important to understand how the underlying data grows, and the impact it has on the job. Does it make sense to increase the number of executors or cores per executor when the underlying data grows? I’ve observed that if teams that don’t spend enough time writing logic to assign these parameters dynamically, it becomes a big issue in the future, particularly when it comes to solving performance problems.

As such, if the data being processed is dynamic in nature, it is advisable to write an algorithm to assign these parameters before the jobs are executed. Many teams may latch onto Spark’s dynamic resource allocation feature to solve this problem, but in my experience, it may lead to other problems, which I will cover in my next point.

Lastly, assigning value to shuffle partitions could be tricky. It is advisable to find a range where the size of shuffle partition will oscillate given the increase in data.

3) Allocate Executors: Static Vs Dynamic

Spark offers two ways to assign executors to a job — static and dynamic. The former means that the number of executors during the job remains constant, while the latter either uses more executors, or frees them up as the job processing progresses. All one needs to do is define the minimum and maximum number of executors the Spark job can have during the Spark-submit invocation.

While dynamic execution feels more palatable and is rightly recommended by practitioners, it may impact the number of batch jobs that are running simultaneously in the cluster. Typically, during the start of any project, a team comes up with the initial cluster sizing and set of jobs to be submitted. Dynamic allocation works fine here. Now, at least two elements will grow as time progresses:

1) Underlying data to process; and

2) Number of jobs running concurrently

In both the above cases, while the demand for the executors (or infra) grows, the underlying infrastructure for the Spark cluster will not grow as easily.

Also, you cannot really be certain as to when your job will be completed, especially if many jobs have been submitted using dynamic resource allocation. The diagram below outlines time required (X-axis) against the number of executors required (Y-axis).

Figure 1: Executor utilisation by a Spark Job

At t0, when Spark job does not require executors, it may release them. At t2, when it needs more executors, there are two potential scenarios. It either gets the executors and finishes at t3, or fails to get the executors required, because other jobs are running concurrently with the same dynamic allocation setting and cluster is busy and finishes at t4. For time sensitive jobs, this may really become stressful to manage.

I believe that dynamic resource allocation strategy will scale well when the underlying infra is elastic. Otherwise, you will have scaling issues in times to come. Also, if you have jobs that are time sensitive, it is essential to provide a static number of executors that would guarantee completion at a reasonable time. As such, I advise that it is always better to look at data growth and assign executors statically.

4) Optimise Cores Per Executor

One of the important Spark job parameters is cores_per_executor. The higher the number of cores per executor, the more tasks can be finished in tandem. For example, if you set the parameter value to 5, then just 5 tasks per executor run at a time. When a huge amount of data is being processed and developers lack distributed programming skill, you may run into data spill-to-disk problem, which means that there is not enough memory for a transformation of a task to be carried out, and data needs to be persisted on the disk. The i/o operations incurred here will increase the processing time.

For some jobs, this data spilling problem can be decreased by reducing the number of cores per executor. Here’s an example. Suppose we run a job with the following settings:

executor_memory = 18GB

number_of_executors = 10

cores_per_executor = 5

The cores_per_executor setting controls how many tasks can run in parallel in an executor, which, in this case, is 5.

Now, if you are observing a data spill problem in your system, one way could be to increase the executor_memory to a higher value. If you are limited by controls implemented by infrastructure team, you can try reducing the number of cores_per_executor. If we reduce this setting to 3, then every task may get around 6GB (18GB / 3 cores per executor) compared to ~3.5GB (18GB / 5 cores per executor).

For the sake of the above example, I’ve removed the exact calculations as the internal memory also gets distributed for various other tasks. But the number of cores needs to be compensated by increasing the number of executors from the existing 10 to 16 because you want to be closer to the total number of cores, which is 50 (10 number_of_executors x 5 cores_per_executor). The incremental 6 executors give you an additional 108GB (18GB x 6 executors) of memory, which may be beneficial.

While this may not be true for all scenarios, I recommend testing this carefully because every job is coded differently. Simply increasing the number of executors can potentially create more problems in the system. There are other well-known mechanisms that also fix the spill problem, such as increasing the number of partitions, or salting.

5) Repartition Data

Data skewness is a known problem and could introduce lengthy delays while processing the actions. Coalesce is often a recommended strategy to reduce the number of data partitions. However, coalesce regroups partitions within the same executor which may not solve data skewness issue. Repartitioning could be used here as it balances the data distribution across the executors, but at the cost of moving data across nodes.

When there are use cases such as persisting intermediate data, checkpointing or writing data to a hive table, it becomes necessary to ask if repartitioning is needed. If data is not equally partitioned, it may lead to a long writing time by a handful of executors (which holds lots of partitioned data) or a small files problem.

In such use cases where a huge amount of data needs to be persisted, repartitioning (rather than coalesce) is recommended so that data gets distributed equally across all executors. Developers must be aware of using this functionality as they write the code and know which actions will lead the data to be accumulated in few executors.

Conclusion

Looking beyond code optimisations is often necessary to solve performance issues. The above-mentioned challenges and solutions have helped our team to further improve the performance of Spark jobs. Of course, there are many other methods to optimise performance apart from the examples given in this article, and I recommend looking beyond the above suggested recommendations. In my next article, I’ll be sharing important best practices, and an ideal team structure to adopt to prevent performance issues repeating regularly in an organisation.

Kapil is a technologist with over a decade of experience working as a software developer, product manager and solution architect building large scale enterprise products. In his current role as Head of Engineering and Architecture at DBS Bank Singapore, Kapil is responsible for defining the technology strategy and adoption, hiring and coaching talents to build superior products for the finance platform

--

--

kgshukla
DBS Tech Blog

Kapil is a technologist with focus on building data intensive applications and data analytics.