Four things you need to know when you start using Apache Spark

Davide Coccia
gft-engineering
Published in
8 min readApr 17, 2020

Written by Davide Coccia and Elton Stafa

Photo by ev on Unsplash

Today, when facing a Big Data project or initiative it may easily end up with the development of an Apache Spark application.

Apache Spark (https://spark.apache.org/) is a very powerful processing engine and its widespread adoption takes the advantage of a JVM based programming language (Java, Scala) and even Python. Furthermore, there are several libraries and frameworks that allow you to plug Spark in your Hadoop Cluster to interact with its ecosystem (Hive, HDFS, Impala, Kafka, etc.).

Java developers can be engaged to Spark projects with low effort. But, even if you are already familiar with Java/Scala fundamentals and batch processing, it is not so obvious how to switch effectively into the development of a Spark application.

This article shows four tips and best practices directly learned on the “field” on our Big Data Spark projects that may boost your application performance.

#1 Java iteration is not Spark iteration

Spark is basically used to process (huge) collections of data. A common coding approach is to use Java native instructions to manage and handle such collections — and to expect Spark to do the magic.

Let’s consider this scenario: there is a file/some/path/customerbase.avro on our HDFS Hadoop filesystem containing the full list of our clients. The set of attributes on this file (i.e. client id, name, address, customer classification …) needs to be extended including data available on another file: /some/path/accountsbase.avro.

The requirement is to obtain a new file on the HDFS filesystem containing the list of our clients and the total balance amount of their accounts.

A straightforward approach would lead to create a DataFramefrom the input HDFS files and then, a collect operation to iterate the list of the clients:

The resulting DataFrame has the expected result, but, from the internal Spark engine perspective, parallel processing did not occur. The collect API force the data to be sent back to the driver (memory) from each executor and then the driver iterates each element to perform a lookup to another DataFrame. This is not a good approach, especially from the scalability and performance point of view.

The alternative, just like in SQL-Databases, is to use native Spark APIs such as JOIN (Spark SQL). In this case the code would turn into:

At runtime this code will run only in the executors, exploiting the parallelism available in the system.

#2 Boost performances with Spark parameters

Spark applications may face limitations due to the amount of cluster resources available, so it is important to know some tips that can help you to improve performance. Directly into the spark-submit command or into the definition of the SparkContext itself, you can specify some configuration that can boost your application.

shuffle.partitions

As we know, Spark distributes data across the cluster in order to minimize the workload on each node. In order to do that, Spark splits the data into smaller pieces and sends those parts to the executors. Each executor creates a thread pool to execute the tasks (e.g. SQL join or groupBy), and each thread takes only one partition of data.

For this reason, the number of partitions passed to an executor is proportional to the time it takes to complete the task.

The default value for this property is 200 that gives great advantages in term of parallelism but it may be an overkill when data size is small. The value of this setting depends on the size of the data stored in HDFS. For example for a few MB of data, partition number should be smaller as well (e.g. 2–3).

It is also possible to control via Spark API the number of partitions using the functions repartition and coalesce. The first one allows to increase or to decrease the partition number whilst the latter only allows decreasing.

The important thing to remember is that too many partitions can (dramatically) slow down the performance, so it’s important to keep a good balance between the size of the partitions and the number of them.

dynamicAllocation

By default, in Spark, this property is set to false but it is advisable to check the current environment configuration for possible overrides.

By setting to true this property, Spark can dynamically request and assign the resources needed for the running applications. This is helpful when you have no elements to size the resources needed by your job and you let Spark to choose them. In production environment, we experienced few drawbacks using this approach since resources were often over allocated causing shortages and errors.

Therefore, for production environments it’s advisable to have this property disabled and to tune the proper resource allocation (e.g. executor memory, executor cores and driver memory) for each job.

#3 Spark API vs Spark SQL

This is a very delicate comparison. It is like wondering if pizza is better than chocolate. In this case, there are two strands of thought, the first that thinks the query written in pure SQL is more readable and the other that thinks the opposite.

Let us have a look at an example:

What stands out from the example is that in the “Spark API” version, the code is more compact than that of the “Spark SQL” and this happens every time you have to apply a simple query (such as filters, select and join operations).

Obviously, using SQL you will be able to handle situations that are more complicated by writing queries with even 100 lines because you are not limited by the Spark framework functionalities. Under the woods, Spark uses the Catalyst Optimizer to optimize the execution of the queries. The Catalyst Optimizer will generate multiple logical plans and it will choose the best among them in terms of computational cost.

The optimization begins with analyzing the correctness of the query (e.g. “select col from table” needs a validation of the column name “col”).

This optimization takes time (few milliseconds) and it is applied at runtime when it needs to parse the SQL queries- On the other hand, using Spark API, this correctness check is done at compile time and, thanks to the type-safety given from the Spark Datasets, it will force you to define the schema before the creation of this kind of objects.

In conclusion, choosing between these two approaches might be difficult. In terms of performance they are similar (there are many studies that confirms that). The thing you should take into account to make your decision is the impact on the code’s readability and the possibility of using typed Datasets to reduce the number of potential bugs. In some cases, for example unit tests, Datasets are more useful than a simple and generic Dataframe. On the other side, SQL is surely more human-friendly and gives much more flexibility and possibilities.

#4 Graph processing using GraphX

GraphX is a graph-processing framework built on top of Apache Spark that implements operators like join, map, and group-by. GraphX is generally used to manipulate data in the form of SQL tables taking advantage of the Spark parallel computation (e.g. image below).

In our case, we had to use such tables of raw data and links in order to compute a hierarchical tree of relationships. To achieve this goal, we used Pregel, a GraphX API, which allowed us to “surf” the vertices of the graph.

Pregel is an iterative algorithm based on message passing and the properties of each vertex. As we said, the core of the algorithm are messages that are sent from the root of tree to the leaves. At each step of the Pregel algorithm, a message associated to a vertex is updated with new values (e.g. in our scenario share percentages of companies).

Let’s have a look at some code:

From the code, “graph” is an object of type Graph, an abstraction given by the GraphX API and based on Spark RDDs. We called the Pregel function and passed 2 lists of parameters that contains respectively:

  • The initial message sent to each vertex of the graph
  • The maximum number of iterations for the algorithm
  • The direction of the edges
  • The receiver of the message on each vertex and the computer of the new vertex value
  • The function applied on the vertex that received a message on the previous iteration
  • The function that takes two incoming message and merge them into one

Now, let’s consider this use case.

In a credit risk scenario for financial institutions we have a set of thousands of companies and their relationships between each other, like the following two tables:

COMPANIES

RELATIONSHIPS

The challenge is to provide a way to retrieve a specific group of companies, which are connected through relationships, according to specific rules (e.g. only share percentage greater than 50%, relationship type in (‘G’, ‘E’)).

We solved this problem using the Spark GraphX APIs, that can manipulate large amounts of data represented as a graph with vertices and links (in this case the companies are the vertices and the relationships are obviously the links). In particular, we used the function Pregel which provides the possibility to assign an unique identifier to each connected group of companies and at the same time to apply the rules mentioned before over the links, in order to shrink the whole graph to only the companies that are valid for the rules.

From Graph Theory, those subgraphs are called Connected Components in which any pair of vertices are connected by paths.

After executing the function Pregel, we added a column to our first company’s table:

The “Company C” was removed because of its low share percentage and we assigned to each group its own ID that can be used for a fast and simple retrieval. In detail, these are the steps of the algorithm:

  • First, each vertex of the graph is mapped with a unique identifier (of Scala Long type) and the message associated contains this ID.
  • Next, each vertex starts sending their message using the sendMsg function to their neighbors.
  • Now, comes in the vprog function that evaluates the new message with the old one: if the received ID is smaller, then the ID associated with the current vertex will be equals to the new one.

From the example below, we can see the node “7” sending his ID to the node “8” so this last one updates his message with the ID “7”.

The algorithm ends when there are no more messages to send. Each vertex is now mapped with a new ID that represent the smallest ID of his related connected component, which is different from the others.

References and useful links

--

--

Davide Coccia
gft-engineering

Engineer, Enabling Architecture Practice Lead @ GFT Italy