What is an RDD in PySpark?

Hunter Phillips
7 min readJun 10, 2023

--

This article covers the basic uses of resilient distributed datasets in PySpark. It includes examples of both transformations and actions that can be performed on them.

Resilient Distributed Datasets (RDDs)

In PySpark, a resilient distributed dataset (RDD) is a collection of elements. Unlike a normal list, they can be operated on in parallel. This basically means that when an operation is performed on a collection, it is split into a number of subcollections. These subcollections are sent to a cluster of computers, and the operation is performed in parallel on each subcollection and returned. RDDs are also fault tolerant, which means operations will be properly performed even if a component of the cluster fails.

An RDD can be created from an existing collection, or it can be created from an external dataset. To start, a simple list can be loaded and parallelized. Parallelization is controlled by SparkContext; it connects to a cluster and can broadcast the data to it.

from pyspark import SparkContext

# initialize SparkContext
sc = SparkContext(master='local', appName='test')
data = [1, 5, 10, 15, 20, 25, 30]

# c = collection to distribute
# numSlices = partitions of collection
distributedData = sc.parallelize(c=data, numSlices=3)
# preview the partitions
distributedData.glom().collect()
[[1, 5], [10, 15], [20, 25, 30]]

When parallelizing the data, the number of partitions, numSlices, represents the number of tasks, or subcollections, to run on the cluster. About 2 to 4 slices per CPU in the cluster is normal. glom() can be used to gather each partition’s data into a list, and collect() can be used to preview the partitions. Now, operations can be performed on the RDD. There are two types of RDD operations: transformations, which yield a new RDD, and actions, which return a value.

Transformations

Transformations are operations that return a new RDD.

Map

map(func) passes each element of an RDD through a function, and the appropriate operations are performed on each element. In the example below, each element of the distributed dataset is multiplied by 2.

# map
newRDD = distributedData.map(lambda x: 2*x)
newRDD.glom().collect()
[[2, 10], [20, 30], [40, 50, 60]]

Filter

filter(func) returns an RDD of elements that meet the requirements of the function. The example below filters for elements with a value greater than 10.

# filter
newRDD = distributedData.filter(lambda x: x > 10)
newRDD.glom().collect()
[[], [15], [20, 25, 30]]

FlatMap

flatMap(func) is similar to map but each element can be mapped to an output of 0 or more elements (a sequence). In this example, the input element is mapped to a tuple of itself and the output of 5x.

# flatMap
newRDD = distributedData.flatMap(lambda x: [(x, 5*x)])
newRDD.glom().collect()
[[(1, 5), (5, 25)], [(10, 50), (15, 75)], [(20, 100), (25, 125), (30, 150)]]

MapPartitions

mapPartitions(func) is similar to map but runs on each partition and returns the new partition. In this example, each partition’s elements are summed and returned as the partition.

# mapPartitions
def f_mapPart(iterator):
yield sum(iterator)

newRDD = distributedData.mapPartitions(f_mapPart)
newRDD.glom().collect()
[[6], [25], [75]]

MapPartitionsWithIndex

mapPartitionsWithIndex(func) is similar to mapPartitions but also includes the partition’s index. The example below yields the index of each partition.

# mapPartitionsWithIndex
def f_mapPartIndex(index, iterator):
yield index

newRDD = distributedData.mapPartitionsWithIndex(f_mapPartIndex)
newRDD.glom().collect()
[[0], [1], [2]]

Union

union(RDD) returns a new RDD with the union of the original RDD and provided RDD. The example below shows the distributed dataset unioned with the distributed dataset, creating a new RDD twice as long.

# union
newRDD = distributedData.union(distributedData)
newRDD.glom().collect()
[[1, 5], [10, 15], [20, 25, 30], [1, 5], [10, 15], [20, 25, 30]]

Intersection

intersection(RDD) returns a new RDD with the intersection of the original and provided RDDs. The example below combines the original distributed data and a new distributed dataset to generate a new RDD with only the intersections.

data2 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# distributedData2 = [[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
distributedData2 = sc.parallelize(data2, 4)

# intersection | distributedData = [[1, 5], [10, 15], [20, 25, 30]]
newRDD = distributedData.intersection(distributedData2)
newRDD.collect()
[1, 10, 5]

Distinct

distinct() returns a new RDD with the unique elements from the original.

data3 = [1, 1, 2, 2, 3, 3, 4, 5]
distributedData3 = sc.parallelize(data3, 4)

# distinct
newRDD = distributedData3.distinct()
sorted(newRDD.collect())
[1, 2, 3, 4, 5]

GroupByKey and MapValues

groupByKey() requires an RDD with elements of (K, V) and returns a new RDD of elements (K, Iterable<V>), where Iterable<V> includes all values paired with K.

data4 = [("red", 1), ("red", 2), ("red", 3), ("blue", 4), ("blue", 5)]
distributedData4 = sc.parallelize(data4, 4)

# groupByKey()
newRDD = distributedData4.groupByKey()
newRDD.collect()
[('red', <pyspark.resultiterable.ResultIterable at 0x7fea6ec3e860>),
('blue', <pyspark.resultiterable.ResultIterable at 0x7feaa422ffa0>)]

To view the values of the iterables, the RDD’s elements can be mapped to the list function with mapValues(func), which alters each value without altering the keys.

newRDD.mapValues(list).collect()
[('red', [1, 2, 3]), ('blue', [4, 5])]

ReduceByKey

reduceByKey(func) requires an RDD with elements of (K, V) and returns a new RDD with elements of (K, V), where V is aggregated based on K and reduced by the function. In the example, it is important to note that a and b are required for the function to add each element in the list. As an example, [1, 2, 3] may be reduced like 1 + 2 = 3, then 3 + 3 = 6. The result from the previous addition is an input for the current addition.

# reduceByKey()
newRDD = distributedData4.reduceByKey(lambda a,b: a+b)
newRDD.collect()
[('red', 6), ('blue', 9)]

SortByKey

sortByKey(ascending=True, keyfunc) returns a new RDD sorted in ascending or descending order based on the key function or the default order. The example below sorts each key in ascending order.

# sortByKey()
data5 = [("zebra", 1), ("red", 2), ("apple", 3), ("blue", 4), ("horse", 5)]
distributedData5 = sc.parallelize(data5, 4)

newRDD = distributedData5.sortByKey(ascending=True)
newRDD.collect()
[('apple', 3), ('blue', 4), ('horse', 5), ('red', 2), ('zebra', 1)]

This next example uses a key function to select the second to last letter of each key and sorts it in descending order:

# sortByKey()
newRDD = distributedData5.sortByKey(ascending=False, keyfunc=lambda k: k[-2])
newRDD.collect()
[('blue', 4), ('horse', 5), ('zebra', 1), ('apple', 3), ('red', 2)]

Join, LeftOuterJoin, RightOuterJoin, FullOuterJoin

join(RDD) returns a new RDD of (K, (V, W)) if the original and provided datasets are (K, V) and (K, W), respectively. In other words, values from identical keys are grouped together and returned. Keys without corresponding pairs in both datasets are not returned.

# join
leftData = [("a", 1), ("b", 2), ("c", 3)]
rightData = [("a", 4), ("c", 5), ("d", 6)]

leftRDD = sc.parallelize(leftData)
rightRDD = sc.parallelize(rightData)

newRDD = leftRDD.join(rightRDD)
newRDD.collect()
[('c', (3, 5)), ('a', (1, 4))]

leftOuterJoin(RDD) returns a new RDD of (K, (V, W)). For each (K, V) in the left dataset, the corresponding (K, W) in the right dataset will be joined. If the key does not exist in the right dataset, None will be returned. This means every K in the left dataset is present in the new RDD.

# leftOuterJoin
newRDD = leftRDD.leftOuterJoin(rightRDD)
newRDD.collect()
[('b', (2, None)), ('c', (3, 5)), ('a', (1, 4))]

rightOuterJoin(RDD) returns a new RDD of (K, (V, W)). For each (K, W) in the right dataset, the corresponding (K, V) in the left dataset will be joined. If the key does not exist in the left dataset, None will be returned. This means every K in the right dataset is present in the new RDD.

# rightOuterJoin
newRDD = leftRDD.rightOuterJoin(rightRDD)
newRDD.collect()
[('c', (3, 5)), ('d', (None, 6)), ('a', (1, 4))]

fullOuterJoin(RDD) returns a new RDD of (K, (V, W)). For each (K, V) in the left dataset and (K, W) in the right dataset, the matches will be returned as (K, (V, W)). If a key exists in the left dataset that is not in the right dataset, the result will be (K, (V, None)). Likewise, if a key exists in the right dataset that is not in the left dataset, the result will be (K, (None, W)). This is essentially a union of the left and right outer joins.

# fullOuterJoin
newRDD = leftRDD.fullOuterJoin(rightRDD)
newRDD.collect()
[('b', (2, None)), ('c', (3, 5)), ('d', (None, 6)), ('a', (1, 4))]

CoGroup

cogroup(RDD) returns an RDD of (K, (Iterable<V>, Iterable<W>)) if the original and source are (K, V) and (K, W), respectively.

# cogroup
newRDD = leftRDD.cogroup(rightRDD)
newRDD.collect()
[('b',
(<pyspark.resultiterable.ResultIterable at 0x7fea6eca6920>,
<pyspark.resultiterable.ResultIterable at 0x7fea6eb33760>)),
('c',
(<pyspark.resultiterable.ResultIterable at 0x7fea6eb31870>,
<pyspark.resultiterable.ResultIterable at 0x7fea6eb30e20>)),
('d',
(<pyspark.resultiterable.ResultIterable at 0x7fea6eb32470>,
<pyspark.resultiterable.ResultIterable at 0x7fea6eb32950>)),
('a',
(<pyspark.resultiterable.ResultIterable at 0x7fea6eb32230>,
<pyspark.resultiterable.ResultIterable at 0x7fea6eb33fa0>))]

To view the iterables, the values can be mapped to lists:

[(k, tuple(map(list, v))) for k, v in newRDD.collect()]
[('b', ([2], [])), ('c', ([3], [5])), ('d', ([], [6])), ('a', ([1], [4]))]

Coalesce

coalesce(numPartitions) reduces the number of partitions of an RDD. The example below coalesces from three partitions to two partitions.

# preview the partitions
distributedData.glom().collect()
[[1, 5], [10, 15], [20, 25, 30]]
# coalesce
distributedData.coalesce(numPartitions=2).glom().collect()
[[1, 5], [10, 15, 20, 25, 30]]

Repartition

repartition(numPartitions) randomly shuffles the data to create more or less partitions. The example below repartitions from three to two, but it differs from coalesce since it randomizes the partitions.

# preview the partitions
distributedData.glom().collect()
[[1, 5], [10, 15], [20, 25, 30]]
# repartition
distributedData.repartition(numPartitions=2).glom().collect()
[[20, 25, 30], [1, 5, 10, 15]]

Actions

Actions are operations that return a value or some values from an RDD rather than creating a new RDD.

Collect

collect() has been used in the previous examples to return the RDD as a list for viewing purposes. The example below shows that the output is a list. The previous examples show various use cases.

# collect
type(distributedData.glom().collect())
list

Reduce

reduce(func) aggregates the elements of an RDD using the provided function. This function takes two arguments and has a single output. The operations should be commutative and associative to allow parallel processes to be performed.

# reduce | distributedData = [[1, 5], [10, 15], [20, 25, 30]]
distributedData.reduce(lambda a,b: a+b)
106

Count

count() returns the number of elements in an RDD.

# count
distributedData.count()
7

First, Take, TakeSample

first() returns the first element in the RDD.

# first
distributedData.first()
1

take(n) returns the first n elements of the RDD

# take
distributedData.take(4)
[1, 5, 10, 15]

takeSample(withReplacement=True|False, num) returns a sample from the RDD with a size of num, with or without replacement.

# take
distributedData.takeSample(withReplacement=True, num=5)
[5, 20, 5, 15, 5]

CountByKey

countByKey() can be used on RDDs with elements of (K, V). The result will be a hashmap, or dictionary, for each key in the form of (K, Count(V)).

# distributedData4= [("red", 1), ("red", 2), ("red", 3), ("blue", 4), ("blue", 5)]
dict(distributedData4.countByKey())
{'red': 3, 'blue': 2}

--

--