Apache Spark Hands-on With Scala.

Gobalakrishnan Viswanathan
The Startup
Published in
8 min readAug 12, 2020
Before continue, There is Introduction to Apache Spark from me. You can head to the link if you are new to Apache Spark

Welcome to some practical explanations to Apache Spark with Scala. There is even Python supported Spark is available which is PySpark. For the sake of this post, I am continuing with Scala with my windows Apache Spark installation.

  1. Spark Shell
    Go to the Spark installation directory and cd to bin folder. type spark-shell, enter. You will see Spark Session being started and showing some logs which are very important.
Initialization of Spark Shell

spark-shell command provides a simple way to learn the Spark API, as well as a powerful tool to analyze data interactively. It is available in either Scala or Python

Let's discuss some terms logged for spark-shell command.

  • SparkContext(sc) is the entry point for Spark functionality. A Spark Context represents the connection to a Spark cluster and can be used to create RDDs in the cluster. Only one SparkContext should be active per JVM. All about Sparck Context constructors and Methods can be found here official link.
  • SparkSession(spark) is the entry to programming Spark with the Dataset and DataFrame API. It is one of the very first objects you create while developing a Spark SQL application
  • We can see the Spark UI from http://rch20lap044:4040/jobs/. This address can be different for each system. This UI gives all the details about the currently running job, storage details, Executors details, and more.

Spark-Shell Commands:

  • :help
    Prints all the options available given by spark-shell.
  • We have already seen that two variables are given by the spark to the console sc and spark. To know the type of these variables, :type option can be used. These variables are given from Spark by default.
  • We can see the history of the commands used by :history. Spark provides an autocomplete feature with a tab button.
  • Let's have some hands-on on RDD. RDS is the basic data unit of Apache Spark on top of which all the operations performed. RDDs are immutable objects means once RDD created, We can modify it. RDD can be of any type supported by the language used with Spark, here it is Scala.

Create RDD:
There are three methods available to create RDD.Those are,

RDD creation methods (Image Credits dataflair)
  • Parallelized Connection:
    Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program. In the below image, With the array object, RDD can be created using SparkContext’s parallelize method.
RDD using parallelized
  • External data:
    Spark can create distributed datasets from any storage source supported. The text file also can be read into RDD using context’s textFile method.
    The collect function used to get the data from RDD.
  • From RDDs:
    RDDs can be created from already existing RDDs using transformations functions available in Spark.

Partitions in RDD:
When we create RDD, By default, RDDs will be divided into partitions. RDD has an attribute named partitions which give lots of methods to get the information about the partitions on RDD.

RDD Partitions

RDD Transformations:
RDD Transformations creates new RDD from existing RDD. Transformation functions can be applied to RDD at any point in time. Since Spark accepts Lazy Loading attitude, Transformations will be Started only when Action been called in the program. The below example shows filter and map transformation functions.

There are two transformations available in Spark.
Narrow Transformations & Wide Transformations

Narrow Transformations (Image credits Databricks)

It means that one input partition in the RDD will contribute to only one partition of output RDD. Narrow transformations are the result of map(), filter() functions.

Wide Transformations (Image Credits Databricks)

In this type, Input partitions contributing to many output partitions. Wide transformations are the result of groupbyKey() and reducebyKey() functions.

Important Transformation Functions:

  1. map:
    Map function iterates over the RDD applies the given function or action to all the elements in the RDD. Map function changes all the elements in the RDD means that Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records.
scala> val array = Array(1,2,3,4,5,6,7,8,9,10)
array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val arrayRDD = sc.parallelize(array)
arrayRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:26
scala> val rddTransformed = arrayRDD.map(element => element * 10)
rddTransformed: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[72] at map at <console>:25
scala> rddTransformed.collect()
res73: Array[Int] = Array(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

2. flatmap:
flatmap transformation can give many outputs to the RDD. Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. The key difference between map() and flatmap() is map() returns only one element, while flatMap() can return a list of elements.

scala> val array = Array(
| "Spark is a processing Engine",
| "It follows lazy loading concept",
| "Means transformation functions will not be processed until action called",
| "Because of this, Memory handling can be done effectively",
| "Spark also stored RDD in multiple partitions",
| "this will allow spark to do multiprocessing",
| "It makes Spark Faster",
| "Spark can be used in Scala, Python, Java, and R.",
| "Spark itsef written in Scala.")
val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[85] at parallelize at <console>:26
scala> val maprdd = rdd.map(element => element.split(" "))
maprdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[86] at map at <console>:25
scala> maprdd.collect()
res82: Array[Array[String]] = Array(Array(Spark, is, a, processing, Engine), Array(It, follows, lazy, loading, concept), Array(Means, transformation, functions, will, not, be, processed, until, action, called), Array(Because, of, this,, Memory, handling, can, be, done, effectively), Array(Spark, also, stored, RDD, in, multiple, partitions), Array(this, will, allow, spark, to, do, multiprocessing), Array(It, makes, Spark, Faster), Array(Spark, can, be, used, in, Scala,, Python,, Java,, and, R.), Array(Spark, itsef, written, in, Scala.))
scala> val flatmaprdd = rdd.flatMap(element => element.split(" "))
flatmaprdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[87] at flatMap at <console>:25
scala> flatmaprdd.collect()
res83: Array[String] = Array(Spark, is, a, processing, Engine, It, follows, lazy, loading, concept, Means, transformation, functions, will, not, be, processed, until, action, called, Because, of, this,, Memory, handling, can, be, done, effectively, Spark, also, stored, RDD, in, multiple, partitions, this, will, allow, spark, to, do, multiprocessing, It, makes, Spark, Faster, Spark, can, be, used, in, Scala,, Python,, Java,, and, R., Spark, itsef, written, in, Scala.)

From the above small example, clear that Map can give only a single output on the other hand flatmap can give any number of outputs from a single input.

3. filter:
filter function only returns the element that meets the condition given. The number of elements in the input RDD need not be equal in the output RDD when the filter function is applied.

scala> val array = Array(1,2,3,4,2,6,7,2,9,10)
array: Array[Int] = Array(1, 2, 3, 4, 2, 6, 7, 2, 9, 10)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at parallelize at <console>:26
scala> val filterRDD = rdd.filter(element => element!=2)
filterRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[93] at filter at <console>:25
scala> filterRDD.collect()
res87: Array[Int] = Array(1, 3, 4, 6, 7, 9, 10)

4. mapPartitions:
Both map and mapPartitions doing the same process, the only change is map function is iterating over all the elements of RDD, mapPartions function iterates all partitions in the RDD.

scala> val mapRDD = rdd.map(element => element*100)
mapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[101] at map at <console>:25
scala> mapRDD.collect()
res94: Array[Int] = Array(100, 200, 300, 400, 200, 600, 700, 200, 900, 1000)
scala> val mapRDD = rdd.mapPartitions(element => element.map(x => x*100))
mapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[102] at mapPartitions at <console>:25
scala> mapRDD.collect()
res95: Array[Int] = Array(100, 200, 300, 400, 200, 600, 700, 200, 900, 1000)

We can see here that, Map function will take every element in the RDD for processing. But mapPartions iterating over partitions. So I wrote one more map function on the iterated partition to iterate over it.

mapPartitionsWithIndex is a function that does the same as mapPartitions but is also gives index value of the partitions means that map() is applied on partition index wise one after the other.

5. union, intersection, and distinct:
union returns new RDD contains all the elements in source and argument RDDs.
intersection returns common elements in both the RDDs
distinct returns the distinct elements in the RDD.

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,1,2))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[122] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array(3,4,5,6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[123] at parallelize at <console>:24
scala> val union_rdd = rdd1.union(rdd2)
union_rdd: org.apache.spark.rdd.RDD[Int] = UnionRDD[124] at union at <console>:27
scala> union_rdd.collect()
res100: Array[Int] = Array(1, 2, 3, 4, 1, 2, 3, 4, 5, 6)
scala> val intersection_rdd = rdd1.intersection(rdd2)
intersection_rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[130] at intersection at <console>:27
scala> intersection_rdd.collect()
res101: Array[Int] = Array(4, 3)
scala> val distinct_rdd = rdd1.distinct()
distinct_rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[133] at distinct at <console>:25
scala> distinct_rdd.collect()
res102: Array[Int] = Array(4, 1, 2, 3)

These are some basic Transformation functions available in Spark. But there is much more. We can head into official doc to learn all the functions out there.

That's all about Spark with Scala hands-on. I know there is much more. But I hope this guide will give some bottom touch. Have a great day. ta ta 😉

--

--