Apache Spark Fundamentals (Part 2)

Lavish Jain
9 min readApr 22, 2018

--

Part 1 focused on Spark cluster architecture. This part deals with what Spark RDD is and its operations, Laziness property and RDD persistence and Shared Variables in Spark.

Resilient Distributed Dataset (RDD)

RDD is the fundamental data structure of Spark. It is the immutable, partitioned collection of data. Each and every dataset in Spark RDD is logically partitioned across many nodes so that they can be computed on different nodes of the cluster.

There are three ways of creating a RDD in Spark depending upon how data is fetched.
1. Parallelized Collections : The elements of collection are copied to a distributed dataset that can be operated in parallel.
e.g. list=[1,2,3,4,5]
rdd1=sc.parallelize(list)
2. External Datasets : When data is imported from outside spark either from local or from hadoop.
e.g. rdd2=sc.textFile(“path of file”)
3. Transformations : Creating a new RDD from an existing one.

RDD Operations

To work with RDDs, Spark provides two types of operations — Transformations and Actions.

Transformations

As mentioned earlier RDD is an immutable data structure. We cannot do changes in an existing RDD but can derive a new RDD using transformations on existing RDD.
Transformations are the functions that are applied on existing RDD to derive a new one. It takes RDD as input and produces one or more RDDS as output.

  1. map(func) : return a new RDD by passing each element of the source RDD through function func. Each input item in the existing RDD will be mapped to single output item in new RDD. So number of items in existing RDD are equal to that of new RDD.

2. flatMap(func) : Similar to map but each input item can be mapped to zero or more output items.

>>> l=["spark is super fast","hive is sql on hadoop","spark is awesome"]
>>> rdd=sc.parallelize(l)
>>> rdd.collect()
['spark is super fast', 'hive is sql on hadoop', 'spark is awesome']
>>> rddm=rdd.map(lambda line:line.split())
>>> rddm.collect()
[['spark', 'is', 'super', 'fast'], ['hive', 'is', 'sql', 'on', 'hadoop'], ['spark', 'is', 'awesome']]
>>> rddf=rdd.flatMap(lambda line:line.split())
>>> rddf.collect()
['spark', 'is', 'super', 'fast', 'hive', 'is', 'sql', 'on', 'hadoop', 'spark', 'is', 'awesome']

In this exampe, we make a RDD of list l which contains three lines. Then we make RDD ‘rddm’ and ‘rddf’ by applying map and flatMap transformations respectively on ‘rdd’. Each line of rdd will pass one by one into the function which is a split function here. In case of map each line transformed into one list containing all the words in that line. So rdd containing three lines is transformed into rddm containing three lists, hence number of elements in rdd is equal to that of rddm. In case of flatMap only one list containing all the words of three lines in rdd is received. So rdd containing three lines is transformed into rddf containing one list.

3. filter(func) : return a new dataset formed by selecting those elements of the source on which func returns true.

>>> l=[1,2,3,4,5,6]
>>> rdd=sc.parallelize(l)
>>> rdd1=rdd.filter(lambda x : x % 2 == 0)
>>> rdd1.collect()
[2, 4, 6]

4. reduceByKey(func) : : When called on dataset of (k,v) pairs, returns a dataset of (k,v) pairs where the values for each key are aggregated using given reduce function func.

>>> l=["spark is super fast","hive is sql on hadoop","spark is awesome"]
>>> rdd=sc.parallelize(l)
>>> rdd1=rdd.flatMap(lambda l:l.split()).map(lambda x: (x,1))
>>> rdd1.collect()
[('spark', 1), ('is', 1), ('super', 1), ('fast', 1), ('hive', 1), ('is', 1), ('sql', 1), ('on', 1), ('hadoop', 1), ('spark', 1), ('is', 1), ('awesome', 1)]
>>> rdd2=rdd1.reduceByKey(lambda a,b: a+b)
>>> rdd2.collect()
[('super', 1), ('on', 1), ('hadoop', 1), ('hive', 1), ('awesome', 1), ('spark', 2), ('is', 3), ('fast', 1), ('sql', 1)]

5. sortByKey() : When called on a dataset of key-value pairs (k,v), it returns a dataset of (k,v) pairs sorted by key. If we pass an argument False then it sorts in descending order.

>>> rdd2.collect()
[('super', 1), ('on', 1), ('hadoop', 1), ('hive', 1), ('awesome', 1), ('spark', 2), ('is', 3), ('fast', 1), ('sql', 1)]
>>> rdd3=rdd2.sortByKey()
>>> rdd3.collect()
[('awesome', 1), ('fast', 1), ('hadoop', 1), ('hive', 1), ('is', 3), ('on', 1), ('spark', 2), ('sql', 1), ('super', 1)]
>>> rdd4=rdd2.sortByKey(False)
>>> rdd4.collect()
[('super', 1), ('sql', 1), ('spark', 2), ('on', 1), ('is', 3), ('hive', 1), ('hadoop', 1), ('fast', 1), ('awesome', 1)]

6. union(other dataset) : Returns a new dataset that contains union of elements in source dataset and argument dataset.

>>> l1=[1,2,3,4]
>>> u1=sc.parallelize(l1)
>>> l2=[5,6,7,8]
>>> u2=sc.parallelize(l2)
>>> u3=u1.union(u2)
>>> u3.collect()
[1, 2, 3, 4, 5, 6, 7, 8]

7. intersection(other dataset) : Returns a new dataset that contains intersection of elements in source dataset and argument dataset.

>>> l1=[1,2,3,4]
>>> u1=sc.parallelize(l1)
>>> l2=[2,4,6,8]
>>> u2=sc.parallelize(l2)
>>> u3=u1.intersection(u2)
>>> u3.collect()
[2, 4]

8. distinct() : Returns a new dataset that contains distinct elements of the source dataset.

>>> l1=[1,2,3,4,3,2,4,1]
>>> d1=sc.parallelize(l1)
>>> d2=d1.distinct()
>>> d2.collect()
[4, 1, 2, 3]

9. join(other dataset) : When called on a datasets of key-value pairs (k,v) and (k,w), it returns a dataset of (k,(v,w)) pairs with all pairs of elements for each key.

>>> l1=[("mango","yellow"),("lemon","yellow"),("apple","red")]
>>> l2=[("mango","sweet"),("lemon","sour"),("apple","sweet")]
>>> j1=sc.parallelize(l1)
>>> j2=sc.parallelize(l2)
>>> j3=j1.join(j2)
>>> j3.collect()
[('mango', ('yellow', 'sweet')), ('lemon', ('yellow', 'sour')), ('apple', ('red', 'sweet'))]

10. coalesce(numPartitions) : Decrease the number of partitions in the RDD to numPartitions.

11. repartition() : Reshuffle the data in the RDD randomly to create either more or fewer partitions.

Actions

Actions are Spark RDD operations that give non-RDD values. Action
returns a value to the driver program after running a computation on the dataset.

  1. reduce(func) : Aggregate the elements of the dataset using a function which takes two arguments and returns one.
>>> l=[1,2,3,4,5]
>>> rdd=sc.parallelize(l)
>>> sum=rdd.reduce(lambda a,b: a+b)
>>> print(sum)
15

2. collect() : Return all the elements of the dataset as an array to the driver program.

3. count() : Returns number of elements in the dataset.

>>> l=[1,2,3,4,5]
>>> rdd=sc.parallelize(l)
>>> n=rdd.count()
>>> print(n)
5

4. first() : Return first element of dataset.

>>> l=[1,2,3,4,5]
>>> rdd=sc.parallelize(l)
>>> f= rdd.first()
>>> print(f)
1

5. take(n) : Return list with first n elements of the dataset.

>>> l=[1,2,3,4,5]
>>> rdd=sc.parallelize(l)
>>> t= rdd.take(3)
>>> print(t)
[1, 2, 3]

6. countByKey() : When applied on RDDS of type (k,v) pairs, returns (k,int) pairs with the count of each key.

>>> l=["spark is super fast","hive is sql on hadoop","spark is awesome"]
>>> rdd=sc.parallelize(l)
>>> rdd1=rdd.flatMap(lambda l:l.split()).map(lambda x: (x,1))
>>> rdd1.collect()
[('spark', 1), ('is', 1), ('super', 1), ('fast', 1), ('hive', 1), ('is', 1), ('sql', 1), ('on', 1), ('hadoop', 1), ('spark', 1), ('is', 1), ('awesome', 1)]
>>> c=rdd1.countByKey()
>>> print(c)
defaultdict(<type 'int'>, {'on': 1, 'is': 3, 'hadoop': 1, 'fast': 1, 'hive': 1, 'sql': 1, 'spark': 2, 'super': 1, 'awesome': 1})

7. saveAsTextFile(path) : Write the elements of the dataset as a textfile in a given directory in the local filesystem, HDFS or other Hadoop supported file system.

Property of Laziness

All transformations in spark are lazy. The transformations are only computed when an action requires a result to be returned to the driver program. Sequence of transformations forms a RDD Dependency graph which is a Directed Acyclic Graph (DAG). Transformations are performed in DAG sequence only when an action is called.
Example of wordcount:


rd=sc.textFile(“file.txt”)
rd1=rd.flatMap(lambda x: x.split())
rd2=rd1.map(lambda y: (y,1))
count=rd2.countByKey()
DAG for WordCount script

Shuffle Operations

Spark needs to perform an all to all operation. It must read from all partitions to find all the values for each key and then bring together values across partitions to compute final result for that key, this is called shuffle.
The shuffle is expensive operation since it involves disk I/O, network I/O etc.

RDD Persistence

As Spark supports lazy evaluation, all the RDD transformations in DAG sequence are needed to be applied when an action is called. Also if same transformation is involved in DAG sequence of more than one action then that transformation is performed repeatedly for each action and whenever any action is called.
If we persist an RDD each node stores partitions of it that it computes and reuses them in other actions on that dataset. This allows future functions to be much faster.
RDD can be mark persisted using persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes.
When applying persistence to an RDD we can specify Storage level that is where to store the persisted RDD.
a. MEMORY_ONLY : In this storage level, RDD is stored as deserialized Java object in the JVM. If the size of RDD is greater than memory, It will not cache some partition and recompute them next time whenever needed. In this level the space used for storage is very high, the CPU computation time is low, the data is stored in-memory. It does not make use of the disk.
b. MEMORY_AND_DISK : In this level, RDD is stored as deserialized Java object in the JVM. When the size of RDD is greater than the size of memory, it stores the excess partition on the disk, and retrieve from disk whenever required. In this level the space used for storage is high, the CPU computation time is medium, it makes use of both in-memory and on disk storage.
c. DISK_ONLY : In this storage level, RDD is stored only on disk. The space used for storage is low, the CPU computation time is high and it makes use of on disk storage.
The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY while using persist() we can use various storage levels (described above).
When the cached data exceeds the Memory capacity, Spark automatically evicts the old partitions(it will be recalculated when needed). This is called Last Recently used Cache(LRU) policy
e.g. To persist an RDD— rdd.persist(Storage_Level)
To cache an RDD- rdd.cache()
To unpersist an RDD — rdd.unpersist()

Shared Variables

When functions are passed to a specific Spark operation, it is executed on a particular remote cluster node. Usually, the operation is done in a way that different copy of variable(s) are used within the function. These particular variables are carefully copied into the different machines, and the updates to the variables in the said remote machines are not propagated back to the driver program. For this reason, one cannot support the general; read-write shared variables across the tasks and expects them to be efficient. Nevertheless, Spark does provide two different types (limited) of shared variables to two known usage patterns.

  • Broadcast variables
  • Accumulators

Broadcast Variables : Broadcast variables allow Spark developers to keep a secured read-only variable cached on different nodes, rather than merely shipping a copy of it with the needed tasks. Broadcast variables are supposed to be read-only. Before launching the computation, Spark sends it to each node concerned by related task. After that each node caches it locally in serialized form. Now before executing each of its planned tasks, instead of getting values from the driver, it retrieves them locally from the cache. So broadcasting doesn’t really mean that given object is not transmitted across the network at all. But with the difference to “normal” variables, broadcast ones are always read-only and they can be sent only once. Broadcast variables should then be used when different tasks operate on the same data.

Broadcast variable’s value can be accessed by calling the value method.

>>>broad = sc.broadcast([1,2,3])
>>>print(broad.value)
[1,2,3]

Accumulator : The second method of sharing objects in Spark consists on the use of accumulators. As its name indicates, their main role consists on accumulating values. A use case of accumulators are counters. In this case accumulator could act as an shared value, incremented with the number of objects treated in each task. Unlike broadcast variables, accumulators are writable. However, written values can be only read in driver program. It’s why accumulators work pretty well as data aggregators.

That’s all for this time, in the next and last part of this blog will focus on how to submit spark applications, Spark ecosystem and advantages of Spark over Hadoop.

--

--