Apache Spark: how to choose the correct data abstraction?

Federico Sala
Quantyca
Published in
8 min readJun 26, 2019

Apache Spark offers three different APIs to handle sets of data: RDD, DataFrame, and Dataset. Picking up the correct data abstraction is fundamental to speed up Spark jobs execution and to take advantage of Spark’s internal optimizations. Besides, choosing a good data structure quickens the development process.

Photo by Tianyi Ma on Unsplash

As a data architect and software developer at Quantyca, every day I have to make choices so as to fastly deliver high-quality code together with optimized algorithms. Thus, I’ve decided to write this article to briefly detail each of the data structures offered by Apache Spark, the well-known data processing tool. I will underline their pros and cons giving you some hints on how to choose the correct data abstraction between:

  • RDD: a distributed collection of Java objects;
  • DataFrame: a structured and untyped dataset;
  • Dataset: a structured but typed dataset, the perfect marriage between RDDs and DataFrames.

Throughout the dissertation, I will supply some examples written in Scala language.

Introduction

Nowadays, one of the most widespread libraries for big data processing is Apache Spark, a fast and general-purpose cluster computing system. In the framework of map-reduce paradigm, Spark has become a de facto standard. In fact, the first implementation of the map-reduce algorithm was developed by Google; but, after a few years, an open-source implementation was published in Hadoop. At first, Hadoop released a low efficient version, then an optimized one, which introduced resource negotiation using YARN. Yet, Hadoop exhibited poor performances in IO caused by read/write operations on disk. Finally, Spark entered the scenario by bringing impressive optimizations by reading data from memory instead of disks. This innovation incredibly boosts programs execution speed.

Figure 1: components of a distributed Spark application.

In short, every Spark application consists of a driver program and many executors spread across the cluster nodes performing parallel operations and returning the result to the driver, the main of the application (see Figure 1). In order to perform parallel and distributed computations, Spark provides different structures for handling bunches of data: RDDs, DataFrames, and Datasets.

The three data abstractions provide the same result to the user, but they differ in APIs they offer, in performance and in the way they work.

Let’s jump into them!

RDD

The main data abstraction provided by Spark library since release 1.0 is the RDD, which stands for Resilient Distributed Dataset. An RDD is a fault-tolerant collection of data elements partitioned across the cluster nodes that can be operated on in parallel using Spark’s APIs. In most of the cases, RDDs are created by loading data from distributed data stores (like HDFS, HBase, Cassandra, or any other data source supported by Hadoop), by parallelizing a Scala collection (such as a List or a Sequence), or by reading files stored in the local file system.

At a high-level point of view, an RDD can just be considered as a set of Java objects which supports two types of operations: transformation and actions. Transformations create a new RDD from an existing one, while actions return a value to the driver after a computation on the initial RDD. The core of Spark’s paradigm is the concept of laziness: transformations are effectively computed only when an action is called, i.e. when a result is required to be returned to the driver program (see Figure 2).

Figure 2: RDD operations.

Being RDDs composed of Java objects, they offer a familiar object-oriented programing style. RDDs’ APIs allow low-level processing on the data structure: map, filter, reduce, are the most common operations to manipulate RDD objects using the typical object-oriented and functional programming features offered by Scala language. Let’s see an example.

Suppose you have a text file containing Shakespeare’s Romeo and Juliet where each line represents a line of the original tragedy. You want to count how many times the word “Juliet” appears in the whole text. For simplicity, consider you have the text without punctuation.
This parsing task fits well for RDDs. Basically, you have to follow these steps:

  • load the text file into an RDD, where each RDD element is a line of text;
  • map all the lines to lower case letters;
  • flat the RDD into an RDD of words (of String type) by splitting all the lines on the whitespace character;
  • filter (retain) all the words equal to “juliet”;
  • map all the elements to the number 1;
  • apply a reduce with a sum function to get the total result.

A great advantage of RDDs is that they are compile-time type safe. Since Scala is a strongly typed language, Scala compiler is able to detect any type incompatibility yet at compile-time, giving you the opportunity to fix any error without having to run the entire application.

However, since RDDs contain Java objects, they suffer from both Garbage Collection and Java serialization issues, which are expensive operations when the data grows. Unluckily, Spark does not offer any built-in optimization to speed up this kind of processes. Because of this, DataFrames were introduced in the library.

Before going on, a little recap on RDDs’ pros and cons:

✔️ Object-oriented and functional programming style

✔️ Low-level control on data

✔️ Compile-time type safety

Serialization and Garbage Collector issues

No built-in optimizations

DataFrame

DataFrames were included in Spark version 1.3 under the context of Spark SQL. Spark SQL is a Spark module for structured data processing which allows the usage of SQL queries.

A DataFrame is a dataset organized into named columns. At a conceptual level, it is equivalent to a table in a relational database or to a Pandas’ dataframe in Python. Going deeper, in Scala, a DataFrame is a structured dataset of Row objects. A Row is a generic untyped Java object. DataFrames can be constructed from different sources: structured data files, Hive tables, tables from external databases, or even existing RDDs.

Compared to RDDs, DataFrames offer a higher level of abstraction. In fact, DataFrames can be treated as tables. Spark SQL provides APIs to run SQL queries on DataFrames with a simple SQL-like syntax.

As an example, suppose you have a Hive table called orders containing information about e-commerce order deliveries. For simplicity, let’s consider only two columns of the table: status and country. You want to count the closed orders for each country. Using a SQL-like syntax, doing such a task is straightforward (see VERSION 1 in the code snippet):

  • load the orders table into a Spark DataFrame;
  • filter (retain) the closed orders with a where clause;
  • call a group by followed by a count to get the result.

Alternatively, when loading the table, a SQL query can be used to extract the same result in one shot (see VERSION 2 in the code snippet).

Unlike RDDs, DataFrames come out with a couple of optimizations under the hood. The first one is the Catalyst query optimizer, an engine which interprets Spark code and builds an optimized logical and physical query plan. The second one is the Tungsten optimizer which implements the off-heap storage mechanism. Tungsten optimizer provides serialization into the off-heap storage so that transformations perform directly on this off-heap memory, avoiding serialization costs associated with standard Java or Kryo serializers. Tungsten also offers the whole-stage code generator, a component that converts the optimized physical plan into Java bytecode to be run on each executor.

These optimizations highly speed up the execution time of Spark jobs in terms of CPU and memory efficiency, considerably reducing the thrash policies of the Garbage Collector.

But, all that glitters is not gold. Unfortunately, DataFrames are not type safe: type is checked only at runtime. For example, if you accidentally select the wrong column when writing a query plan with DataFrames, the compiler does not complain and the error is only detected when the application is run. This is a tedious problem for Spark developers since it significantly lengthens the development process. To face this issue, Spark library introduced Datasets.

Let’s sum up DataFrames’ pros and cons:

✔️ High-level of abstraction

✔️ Possibility to run SQL queries

✔️ Catalyst optimizers for query plans

✔️ Tungsten optimizer for serialization

No object-oriented programming

Types inferred at runtime

Dataset

Datasets are available from Spark release 1.6. Like DataFrames, they were introduced within Spark SQL module.

A Dataset is a distributed collection of data which combines the benefits of RDDs and the power of Spark SQL engine. Indeed, Spark Datasets offer both the OOP interface for transformations and the SQL one to run queries. Datasets are very similar to RDDs: they can be constructed by reading data from external sources or parallelizing a set of Java objects.

Datasets are a good choice when dealing with structured or semi-structured data: a tabular representation is automatically deduced by Spark as long as you provide the type of the single data object in a Scala case class. For example, assume you have to process some sensor data collected during a car race. Ideally, suppose that sensors log the speed of the car every second in a json structure like the following:

{"carId": 5, "driver": "S.Vettel", "team": "Ferrari", "lap": 22, "speed": 340, "timestamp": 123456789}

Sensor data gathered from all the cars participating in the race are available in a json file. Now, you want to find the driver from team “Ferrari” who reached the maximum speed. You can do that using an RDD-like style (see VERSION 1 in the code snippet) or in a SQL way (see VERSION 2 in the code snippet).

In the first case, follow these steps:

  • filter (retain) only lines coming from team “Ferrari” sensors;
  • sort the dataset by speed in descending order;
  • map the objects to take only the driver field;
  • take the first object (the head) of the dataset.

Whereas the SQL query is self-explanatory.

Like RDDs, Datasets are compile-time type safe. In fact, objects’ types are inferred at compile-time which pragmatically saves both development time and costs.

Furthermore, Datasets inherited all the optimization introduced with DataFrames. Catalyst and Tungsten optimizers are still here, but with a little improvement: Encoders. An Encoder is used to encode (decode) any Java object to (from) Spark’s Row internal format. Through the generation of byte code used to interact with the off-heap data, the Encoder provides on-demand access to individual data attributes without deserializing the entire Java object. As a result, Spark Datasets gain in memory use and highly speed up program execution.

Yet, Datasets suffer from Garbage Collector overhead when object serialization is necessary. Object serialization is strictly needed when map and filter operations are invoked with user-defined functions (UDFs). For example, if you define a custom function to square a number instead of using the built-in one offered by Spark, you do not exploit all the optimization features. Finally, many Dataset’s features are still tagged as “experimental” into the Spark documentation.

Before ending the article, let’s summarize Datasets’ pros and cons:

✔️ Object-oriented programming style

✔️ Compile-time type safety

✔️ Possibility to run SQL queries

✔️ Catalyst and Tungsten optimizers

✔️ Encoders to optimize serialization

Performance impact on UDFs

Garbage Collector overheads

Some features still experimental

Conclusion

To sum up, Apache Spark offers the three data abstractions outlined above. RDDs are good when you need low-level control on a set of Java objects, while DataFrames and Datasets excel with structured data and run at superior speed.

Pick up what fits your problem best, with an eye on the trade-off between performance boost and quick development.

--

--