An Introduction to Big Data Processing With Spark

Geraldo Souza Junior
Big Data Processing With Spark
13 min readApr 5, 2018

The concept of Big Data exists long time before the term was created. Since a long time, Stock Markets has billions of transactions each day, Space Industry has a lot data to process (trajectory of rocket launches, simulations) and the Oil Industry processes a lot of data about geology. Ok, but what it is Big Data really? Doug Laney in an article published inworkflow 2001 about the challenge of the increase of data management at that time (the big data term is not even present in this article yet), described the three Vs: volume, velocity and variety. Therefore, to answer the question whether your data is big, we can translate this into the following three sub questions:

  • Volume: Can you store your data in memory?
  • Velocity: Can you process new incoming data with a single machine?
  • Variety: Is your data from a single source?

If your answered all the questions with yes, then perhaps your data is not that big, you just need to simplify your architecture’s solution.

If you have answered all the questions with no then your data is big. But if you have mixed answers, then it is complicated. Some will argue that one V is more important than another and you will not find any consensus. The exact answer depends on the problem that you are trying to solve.

Big data application architecture

Big data, such as documents, weblogs, social networks, sensor data, and others, are stored in a NoSQL database, such as MongoDB, or a distributed filesystem, such as HDFS. In case we deal with structured data, we can deploy database capabilities using systems such as Cassandra or HBase built atop Hadoop. Data processing follows the MapReduce paradigm, which breaks data processing problems into smaller subproblems and distributes tasks across processing nodes. Machine learning models are finally trained with machine learning libraries such as Mahout and Spark.

The MapReduce Model

Mapreduce is a paradigm design to process huge volumes of data. It was introduced by Jeffrey Dean and Sanjay Ghemawa at Google in 2004 as a model of programming to solution to process huge collections of data in clusterized computers. This paradigm is based in functional programming primitives that allows the manipulation of great volumes of data in distributed and parallel ways. A great number of real applications can be expressed in this programming model.

The model consists in the creation of a program that is formed by two basic operations: map and reduce. The map operation receives a key/value pair and produces an intermediary set of data, also in the key/value format. The reduce operation is executed to each intermediary key, with all the sets of intermediary values assigned to that key combined. In general, the map operation is used to find something and the reduce operation is used to produce the summarization of the result.

Let’s consider the problem of count the number of times that a word occurs in a big set of documents. Take a look in the pseudo code for this problem using mapreduce:

map(String input_key, String input_value):
// input_key: document name
// input_value: document contents
for each word w in input_value:
EmitIntermediate(w, “1”);
reduce(String output_key, Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result));

For each word from the input, the map function emit the “1” value assigned to the key that represents the respective word. The reduce function sums all the counts emitted by same key, or the same word.

Execution:

The map reduce flow

The map operation is distributed on multiple machines from the partitioning of the data into pieces that can be processed in parallel by different machines. The reduce operation is distributed through the partitioning of the intermediate switches. The partitioning size and function are user-supplied parameters.

Do not make confusion with Apache Map Reduce, a framework that implements the Map Reduce paradigm inside Apache Hadoop (The map reduce solution for distributed processing and file storage). By a long time, the Apache Map Reduce was the most popular framework for big data processing. Implemented in Java, it requires a lot of code to execute simple tasks, due to the verbose nature of Java. Then, in 2014 Spark first version was launched.

Spark was originally conceived at Berkeley’s AMPLab by Matei Zaharia, who went on to cofound Databricks, together with his mentor Ion Stoica, as well as Reynold Xin, Patrick Wendell, Andy Konwinski, and Ali Ghodsi. Although Spark is open source, Databricks is the main force behind Apache Spark, contributing more than 75% of Spark’s code. It also offers Databricks Cloud, a commercial product for big data analysis based on Apache Spark.

By using Spark’s elegant API and runtime architecture, you can write distributed programs in a manner similar to writing local ones. Spark’s collections abstract away the fact that they’re potentially referencing data distributed on a large number of nodes. Spark also allows the usage of functional programming methods, which are a great match for data-processing tasks, and the great advantage, 100 times faster than Apache Mapreduce.

Features of Spark

Spark extends MapReduce by avoiding data movement during processing, through features such as data storage in memory and near-real-time processing, performance can be several times faster than other Big Data technologies.

On-demand query validation for Big Data is also supported, helping to streamline data processing and provides a higher-level API to improve developer productivity and a consistent model for Big Data solution architect.

Spark holds intermediate results in memory instead of writing them to disk, which is very useful when you need to process the same data sets many times. Its purpose was to make it an execution mechanism that works both in memory and on disk, so Spark performs disk operations when the data no longer fits in memory. You can then use it to process data sets larger than the aggregate memory in a cluster.

Spark will store as much data as possible in memory, and then will persist them to disk. It is up to the system architect to look at their data and use cases to assess memory requirements. With this in-memory storage mechanism, the use of Spark brings performance advantages.

Other features of Spark:

  • Supports more than just Map and Reduce functions;
  • Optimizes the use of arbitrary graph operators;
  • On-demand evaluation of Big Data queries contributes to optimizing the overall flow of data processing;
  • Provides concise and consistent APIs in Scala, Java, and Python;
  • Provides interactive shell for Scala and Python. The shell is not yet available in Java.

Spark is written in the Scala language and runs in a Java virtual machine. It currently supports the following languages ​​for application development.

  • Scala
  • Java
  • Python
  • Clojure
  • R

Spark Installation

In this section, we will look into local development Spark installation. Spark is Hadoop’s sub-project. Therefore, it is better to install Spark into a Linux based system. The following steps show how to install Apache Spark.

  • Step 1: Verifying Java Installation

Java installation is one of the mandatory things in installing Spark. Try the following command to verify the JAVA version.

$java -version

If Java is already installed on your system, you get to see the following response −

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (build 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

In case you do not have Java installed on your system, then Install Java before proceeding to next step.

  • Step 2: Verifying Scala installation

You should Scala language to implement Spark. So let us verify Scala installation using following command.

$scala -version

If Scala is already installed on your system, you get to see the following response −

Scala code runner version 2.11.8 — Copyright 2002–2016, LAMP/EPFL

In case you don’t have Scala installed on your system, then proceed to next step for Scala installation.

  • Step 3: Downloading Scala

Download the latest version of Scala by visit the following link Download Scala. For this tutorial, we are using scala-2.11.8 version. After downloading, you will find the Scala tar file in the download folder.

  • Step 4: Installing Scala

Follow the below given steps for installing Scala.
Extract the Scala tar file
Type the following command for extracting the Scala tar file.

$ tar xvf scala-2.11.8.tgz

Move Scala software files

Use the following commands for moving the Scala software files, to respective directory (/usr/local/scala).

$ su — 
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.8 /usr/local/scala
# exit

Set PATH for Scala

Use the following command for setting PATH for Scala.

$ export PATH = $PATH:/usr/local/scala/bin

Verifying Scala Installation

After installation, it is better to verify it. Use the following command for verifying Scala installation.

$scala -version

If Scala is already installed on your system, you get to see the following response −

Scala code runner version 2.11.8 — Copyright 2002–2016, LAMP/EPFL
  • Step 5: Downloading Apache Spark

Download the latest version of Spark by visiting the following link Download Spark. For this tutorial, we are using spark-2.2.1-bin-hadoop2.7 version. After downloading it, you will find the Spark tar file in the download folder.

  • Step 6: Installing Spark

Follow the steps given below for installing Spark.

Extracting Spark tar

The following command for extracting the spark tar file.

$ tar xvf spark-2.2.1-bin-hadoop2.7.tgz

Moving Spark software files

The following commands for moving the Spark software files to respective directory (/usr/local/spark).

$ su — 
Password:
# cd /home/Hadoop/Downloads/
# mv spark-2.2.1-bin-hadoop2.7 ~/spark
# exit

Setting up the environment for Spark

Add the following line to ~/.bashrc file. It means adding the location, where the spark software file are located to the PATH variable.

export PATH = $PATH:~/spark/bin

Use the following command for sourcing the ~/.bashrc file.

$ source ~/.bashrc
  • Step 7: Verifying the Spark Installation

Write the following command for opening Spark shell.

$spark-shell

If spark is installed successfully then you will find the following output.

The default spark-shell is for Scala (shown in the screenshot). To start Python shell execute:

$pyspark

Introduction to Scala

Before starting let’s review some concepts of Scala. For further information, see the language manual or refer to Martin Odersky book “Programming in Scala 3rd edition”.

Scala does type inference. You do not need to declare the variable type. In the example below a tuple with an integer and a string is created. Using getClass you can verify that Scala knows exactly the types of each variable, a tuple being a variable with a set of types.

scala> val tup1 = (0, “zero”)
scala> tup1.getClass
res1: Class[_ <: (Int, String)] = class scala.Tuple2scala> val tup2 = (1, “um”, 1.0)
scala> tup2.getClass
res2: Class[_ <: (Int, String, Double)] = class scala.Tuple3scala> val tup2 = (1, “um”, 1.0)
scala> tup2.getClass
res2: Class[_ <: (Int, String, Double)] = class scala.Tuple3

You can access the values of each tuple using the underscore syntax (“_”).

scala> println(tup1._1)
0
scala> println(tup1._2)
zero
scala> println(tup2._2)
um
scala> println(tup2._3)
1.0

To define a function in Scala it is necessary to say the type of the parameters and the type of the return. In Scala you can define functions within functions. In addition, Scala supports function names with the characters +, ++, ~, &, -, -,. Below is the sum function that performs the sum of two integers.

scala> def sum (x: Int, y: Int) : Int = { return x + y }
sum: (x: Int, y: Int)Int
scala> sum (1,2)res0: Int = 3

Functions do not necessarily need to have a name. Any unnamed function is said anonymous function or lambda function. Below is a lambda function that sums two integers.

scala> (x: Int, y: Int) => x + y
res1: (Int, Int) => Int = <function2<
scala> res1(1,2)
res2: Int = 3

What is an anonymous (lambda) function it is used for? or, how to invoke a lambda function? Many Spark functions have other functions as a parameter. Therefore, lambdas functions are important for not having to formally declare a function. In addition to simplifying and saving code.

The max function presented below returns the largest value among two integers. Note that max receives two integers, not assigning them any variables. In the body of the max function there is the declaration of a lambda function, which receives two integers, and verifies the largest integer.

scala> val max: (Int, Int) => Int = (m: Int, n: Int) => if(m > n) m else n
max: (Int, Int) => Int = <function2<
scala> max(3,4)
res8: Int = 4

First steps with Spark: Data Abstraction

Every Spark application consists of a driver that performs a main function written by the user and also performs several operations in parallel in the cluster. Each of these operations is performed on a data collection, represented internally in Spark by a data abstraction. For now, we will focus on the basic Data Abstraction that Spark provides: the Resilient Distributed Dataset (RDD).

AnRDD is the first abstraction provided by Spark for data manipulation. It is a representation of a data distributed by the cluster nodes that can be operated in parallel. RDDs can be created from files in HDFS or Scala language collections.

There are two ways to create a RDD: (1) by parallelizing an existing collection within the program, or (2) referencing data on a system external to the program (HDFS, HBase, or any data source with a Hadoop InputFormat). Let’s start spark-shell. The spark context will be available by the variable ‘sc’.

RDDs from an existing collection

To parallelize a collection you must invoke the Spark Context’s parallelize method over an existing collection.

scala> val data = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at :29

You can also use the makeRDD function to create an RDD.

scala> val data = Array(1, 2, 3, 4, 5)
scala> val dataRDD = sc.makeRDD(data)
dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at :29

There is no difference between makeRDD and parallelize. The makeRDD function invokes parallelize. You can check the invocation of parallelize directly in the implementation of makeRDD.

An important parameter for the parallel collection is the number of partitions on which the data will be divided. Spark will perform a Task for each partition in the cluster. Usually divided into 2 to 4 partitions per cluster CPU. Spark attempts to configure the number of partitions according to its cluster, but you can configure this number manually by passing the number of partitions as the second argument to parallelize.

RDDs from external data

Spark can create RDDs from any data source supported by Hadoop, including its local file system, HDFS, Cassandra, HBase, Amazon S3, among others. Spark supports any format supported by Hadoop InputFormat.

RDDs can be created from text files using the textFile method of SparkContext. This method takes the URI of the file (which can either be a relative path on the local file system, or a full path of an external system such as hdfs: // and s3n: //) and reads the contents of the file as a collection of lines. Let’s read the Spark readme file inside the spark root folder (you need to start the shell from root folder to be able to execute the following code).

scala> val distFile = sc.textFile(“README.md”)
distFile: RDD[String] = MappedRDD@1d4cee08

Once the RDD is created, distFile can be transformed with operations called dataset operations. For example, we can add all line sizes using map and reduce operations.

scala> distFile.map(s => s.length).reduce((a, b) => a + b)

Notes: about reading files with Spark.

  • If you are reading a local file system file, this file must be accessible with the same path on all nodes. Either the same file is copied to all nodes, or the file is shared via a file system distributed as NFS.
  • All methods for reading files, such as textFile, support reading directories, compressed files, and regular expressions (Unix-like). For example, you can use: textFile (“/ dir”), textFile (“/ dir / *. Txt”), and textFile (“/ dir / * .gz”).
  • The textFile method has an additional parameter to control the number of file partitions. By default, Spark creates a partition for each block of the file (64MB, standard HDFS size), but you can configure Spark to use a larger number of partitions. You can not have fewer blocks.
  • In addition to text files, Spark supports several other formats:

The SparkContext.wholeTextFiles method reads a directory containing several text files and returns a vector with the (file-name, file-content) tuple.

To read other formats you can use the SparkContext.hadoopRDD method. This method reads the InputFormat class from a given jobConf. InputFormats can be included into Scala project as simple maven dependencies.

Saving files

To save a RDD to a file you can use the saveAsObjectFile method, which saves the file using Java serialization, or saveAsTextFile which saves the file in plain text.

scala> val file = sc.textFile(“README.md”)
file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[10] at textFile at :27
scala> file.count
res1: Long = 95
scala> file.saveAsObjectFile(“/tmp/readme_object”)
scala> file.saveAsTextFile(“/tmp/readme_text”)

The saveAsObjectFile method saves the files using Java serialization. The partitions are converted to a stream of bytes, where this stream can later be reverted to the original file.

spark@spark-hare-1.cs.uni-paderborn.de:spark-training$ cat /tmp/readme_object/part-00000
SEQ!org.apache.hadoop.io.NullWritable”org.apache.hadoop.io.BytesWritable
[Ljava.lang.String;]{Gxpt# Apache SparkttNSpark is a fast and general cluster computing system for Big Data. It providestKhigh-level APIs in Scala, Java, Python, and R, and an

The saveAsTextFile method saves all partitions in plain text.

spark@spark-hare-1.cs.uni-paderborn.de:spark-training$ cat /tmp/readme_text/part-00000
Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a

Note that in both methods the file is saved on several partitions (e.g., part-0000).

spark@spark-hare-1.cs.uni-paderborn.de:spark-training$ ls /tmp/readme_object/
_SUCCESS part-00000 part-00001
spark@spark-hare-1.cs.uni-paderborn.de:spark-training$ ls /tmp/readme_text/
_SUCCESS part-00000 part-00001

To retrieve the files you can read the entire contents of the directory using textFile. Note that the number of rows is the same, because the files are the same.

scala> val f = sc.textFile(“/tmp/readme_text/*”)
f: org.apache.spark.rdd.RDD[String] = /tmp/readme_text/ MapPartitionsRDD[10] at textFile at :27
scala> f.count
res28: Long = 95
scala> val f = sc.objectFile(“/tmp/readme_object/*”)
f: org.apache.spark.rdd.RDD[Nothing] = MapPartitionsRDD[7] at objectFile at :27
scala> f.count
res4: Long = 95

Word Count

Let’s create a simple word count using the basic operations of a RDD.

val text_file = sc.textFile(“README.md”)
val counts = text_file.flatMap(lambda line: line.split(“ “)) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(“count.txt”)

The flatMap function transforms an RDD of length N into a collection of N collections, then flattens these into a single RDD of results. Map transforms an RDD of length N into another RDD of length N, in a key-value pair, initializing 1 to every word stored as key. Finally the reduceByKey function will add value to every duplicated key.

Conclusions

In this post, we give an introduction to simple data processing in Spark shell. . In future posts, we will advance in further features of spark for Analytics and Machine Learning. A good further reading about Spark is the books Mohammed Guller’s Big Data Analytics with Spark and Spark in Action. Those two books covers a lot of all Spark’s modules and very useful to use as a reference. Feel free to comment below any questions and suggestions.

--

--

Geraldo Souza Junior
Big Data Processing With Spark

Scientific Developer, M.Sc.@Dice Group. Expanding horizons everyday. Paderborn — Germany. Linkedin: https://www.linkedin.com/in/gsjunior86/