Improving performance in Spark jobs

Light painting photos : https://www.instagram.com/ccarreno_photos/
Photo by: Carlos Carreño

Giving online shoppers an appealing sense that the retailer’s search service is human in its understanding of them, is a Holy Grail of e-commerce.

But to achieve that pinnacle of intuitive search and discovery, one must collect and organize data. Lots of it. Large volumes of shopper data and product information must be distributed among different machines within a cluster, and we must achieve the highest processing efficiency in order to keep the shopper on-site.

Apache Spark technology is often the tool of choice for this challenge. Thus knowing how to optimally configure and maintain Spark is key to success.

Then:

  • c1: You have a Spark stack to work with tons of data in the cloud?
  • c2: You want to save some money in your cloud monthly bill?
  • c3: You want to boost the performance in your Spark jobs executions?
if (c1 || c2 || c3) {
stay_here_and_enjoy();
next_paragraph();
}
else {
//always a good option
go_to_watch_cat_videos();
}

The aim of the first part is to study the theoretical aspects that will be applied in future posts about this topic. We’ll study measurements of the improvements to confirm the approach.

And we’ll provide many links during the course of this post, as well as a complete glossary at the end, since the work in this article has been more research than content creation.

The context

To begin with, it is important to identify the current situation of the projects upon which this study will be carried out. In this first phase, we will analyze the theoretical aspects to apply to a spark batch in order to improve the performance and the cost-effectiveness of your cluster.

This article is situated in the context of a settled stack that has followed the high rates of development that the market pushes us to follow.

The analysis shown here might also be applied to start a new development with the Spark stack, but it is mainly aimed at improving a spark job already established and with a correct performance in production.

00. What’s going on?

It’s fair to start with this section, since one of the most important requirements to analyse and improve software is to have a vision of what happens on the deep within the system.

Me looking at monitoring and metrics data

The options to monitor (and understand) what is happening during the execution of the spark job are many, and they have different objectives.

Monitoring in your Spark cluster

You can monitor and see what happens inside your spark cluster in runtime with some frameworks and utilities. Here are the basic tools to see what happens in your cluster:

  • YARN ResourceManager UI: this utility provides a deep view of cluster resources, such as memory and CPU resources for each executor.
  • Spark Web UI: provides a deep dive into the tasks, the Spark configuration inside each task. One of the best features inside the Spark UI is its option to look into the execution plan and the Directed Acyclic Graph (DAG) that may also be used to visualize the task repartition in each stage.

Those are other more complex solutions to monitor and visualize your Spark cluster:

  • Ganglia: provides a series of metrics and graphics to view the cluster resource usage as memory and CPU.
  • If you like to use Jupyter notebook, this project implements an extension called SparkMonitor that enables the monitoring of jobs sent from a notebook application.
  • In AWS EMR you can set up many build-in tools to “open” the cluster and see what’s happening inside it.

Monitoring inside your application

The visualisation of what happens within the cluster is important to detect failures and performance problems, but the ones that will generate these problems in the end will be the failures or poor efficiency within your application.

For this reason, it is also vital to add metrics inside your application to effectively represent the work states, adjusting them to the specific architecture and code.

Basic log with log4j

With this basic Java log library you can log all that you want inside the application as captured exceptions, warnings or whatever you want to log. This information can be sent to a log collector or to a static file system in the cloud.

This is a quick way to check the basic execution highlights in your apps and discard the gross errors in the execution.

Adding custom metrics with Prometheus, Pushgateway and Grafana

Prometheus is an open-source systems monitoring and alerting tool that uses a multi-dimensional data model with time series data identified by metric name and key/value pairs. Usually in batch mode or on-demand services, Prometheus use to be a perfect match with an intermediate Pushgateway to push the time series.

Use this tool with the help of Grafana to paint your metrics with an easy query language to customize your dashboard. You will be able to add time series metrics inside your application to adapt the output to your architecture and your job approach, and make the analysis and monitoring easy.

Go deeper with Graphite and Grafana

We have also another/complementary solution to add valued metrics to your application, using a system that Spark includes. This feature is called dropwizard.metrics library, and it’s a configurable metrics system that can be set up via the Spark configuration.

Also, the provided Graphite sink can be used to work with this tool.

Extra: UberJVM, InfluxDB and Grafana

There is yet another complex solution to boost the metrics inside your application, using the Uber JVM profiler with InfluxDB for storage and Grafana to visualize it all. Find more info in this article.

After making the changes needed in your code and infrastructure, you should now be able to analyze based on the provided graphics and metrics, the key points that you must solve to increase performance in your stack.

01. Improve your code

By having a strong monitoring base where we can observe what happens in our program when it is executed in the cluster, we can analyze our code to improve certain aspects that will improve performance.

To highlight the way in which Spark works to distribute the data workload and operations, we can take a look at this simple diagram:

Basic Spark System overview

As a highlight, just remember that a shuffle in Spark and more generally in distributed computing is the operation to send data over the network. This is the most expensive action in distributed systems.

There are also many articles where you can find deeper explanations about how Spark works and the distribution of the data and operations along the cluster and its executors.

In this section we’re going to look over some key items related to the Spark code.

Thinking in the objects broadcast

We usually work with joins in the Spark job to cross data between datasets from different sources or data separations. Managing datasets that contain the data to make the unions is one of the key points to improve; a common unknown is the broadcast.

This instruction from the spark API allows us to mark a dataset as read-only variable cached on each node once, rather than sending a copy for all tasks (avoid the shuffles). This is a quick and easy change to add for small datasets that could help to increase the speed of your joins.

Using the cache method to save full datasets in memory may prevent unnecessary shuffling.

Thinking in the shuffles

The previous section related to broadcast has a concrete example about how to avoid unnecessary shuffles in your plan.

Another important topic arises when choosing an arrangement of operators. Some operations trigger tons of shuffles depending on the data you’re working with, and as we said, the objective is to reduce the number of shuffles and the amount of data shuffled.

Some operators such as join, group, and sub operators in the group like the By or ByKey transformations can result in many shuffles.

To summarise, there are different ways to switch these operators; the quicker ones follow:

  • Accumulators are variables that are only added through an associative operation and therefore can be performed in parallel. They can be used to implement counters or additions. Accumulators are created with a start value by calling SparkContext.accumulator. Tasks running on the cluster can modify the result by calling the add operation, for example. However, they cannot read its value; only the driver can read the value from the accumulator using the value method.
  • Reduction operation by key(reduceByKey): when the input and output values do not have the same type. This type of code generates thousands of object creations because a new set must be allocated for each record. The alternative is to use aggregateByKey, which performs the map-side aggregation, and this is incredibly more efficient. Whenever possible it is better to use reduceByKey; also use aggregateByKey, foldByKey and combineByKey before groupByKey.
  • FlatMap-join-groupBy pattern. When you have two datasets grouped by key and the intention is to join them and keep the group, the alternative is the cogroup operation: It avoids the overhead of pack and unpack de groupings.
  • Associative reductive operation (groupByKey ): when performing an associative reductive operation. You can change any of: dataset.groupByKey(_key).mapValues(whatever_op) with an analogue operation such as dataset.reduceByKey(whatever_op). The first one will transfer the entire “dataset” across the network, while the second one will compute local operations for each key in each partition and combine those local results into larger datasets after shuffling.

You can tune also your partitions size if you work with RDD (which is not our case), since working with upper objects like Datasets helps us to automatically improve the shuffles performance. It is important to remember that Datasets execute all their internal operations on RDDs with the only difference being that users do not have to create these RDDs; rather, they are created in the execution plan in the last step, after deciding and passing all optimizations.

All these optimizations are possible because the data is structured and Spark knows its schema, so it can use all its features (Custom Memory Management and Catalyst optimiser) to achieve performance that would not be possible if users worked directly with RDDs.

Replace UDFs with SparkSQL functions

UDFs transform the values of a single row within a table to produce a single corresponding output value per row. You can find more information about how the UDFs work in the background here.

The use of UDFs can affect the execution plan in a negative way since deserialization of the received objects is needed to process the data. Some of the multiple Spark SQL functions that the API provides us could be used to substitute these functions in cases where they can be translated without affecting the operation of the work.

Replacing UDFs is dangerous and must be treated with care as it may not lead to performance improvements. But it is a good exercise that will allow us to avoid future performance problems. The use of Spark SQL functions also reduces the effort in testing since these operations will be performed internally by Spark.

Data serialization

Serialization is defined as the process of converting an object in memory to another format that can be used to store in a file or send over the network. In Spark, deserialized representation is used for records in memory and serialized representation is used for records stored on disk or transferred over the network.

This operation is basic and very important since it directly affects the performance of any distributed application. The calculation becomes slower due to formats that take time to serialize or that consume a large amount of resources. There are two main serializations in Spark: Java and Kryo.

The Kryo serializer is the optimal choice although it is not the default, due to some instabilities in Kryo during previous versions of Spark and a desire not to break compatibility.

Bloated deserialized objects will cause Spark to spill data to disk more frequently and will reduce the number of deserialized records that Spark can cache (for example, to MEMORY storage level). Spark’s fit guide has a great section on how to lose weight.

Serialized objects will result in more network and disk I/O, as well as greatly reducing the number of serialized cached records that Spark can support.

02. Analyze your metrics and iterate

At this point:

  • We can see what happens in our job from various points of view and depths.
  • We know the improvement key points in our code, and we can analyze the metrics along with these ways.

Now it’s time to shoot!

Now we must decide what is important and what is not, what we must change in our code to improve our work. For this, here is a list of tips to look at our metrics and help us determine the action points:

Key points to check in the monitoring

  • Low load percentage of CPU
  • Large number of unused CPUs, or memory spikes in some of them
  • Number of executors/allocated memory/CPUs lower than expected
  • Tasks stay in the same state for a long time, blocking the job, or the job fails due to an exception in a stuck task

The Spark execution plan allows us to enter the lowest level of the process and is an easy way to detect possible problems and their improvements. Reducing the number of stages is an obvious way to optimize a job. In the execution plan we can see the different steps and stages. Here we must be aware of:

  • Number of stages: basic operations like a join between two datasets will normally require less than 5 stages.
  • Number of shuffles: here we can see if we can improve the broadcasting and datasets caching. Choosing what to cache/ broadcast is a difficult decision. Caching an intermediate dataset can greatly improve performance, however we must be careful as Spark requires caching space for processing.
  • Imbalanced tasks execution time: tasks must use a balanced time to finish the execution, so to find a symptom that something is wrong, observe if any of the tasks take too long or too little time to execute.

In the coming months we will continue this series of articles, putting our hands on the keyboard, analyzing specific cases of code and carrying out an intensive task of measuring performance and other parameters. We anticipate that this series will contain:

  • Part 2: Improve the monitoring, iterate over observation and improve the code.
  • Part 3: Analyze the results of the job on the AWS and GCP platforms.

Interesting links

empathy.co

Helping brands provide irresistible search.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Start a blog

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store