Apache Spark™ is a unified analytics engine for large-scale data processing. It was originally developed in 2009 in UC Berkeley’s AMPLab, and open sourced in 2010 as an Apache project. It is written in Scala but API is available in Scala, Python, Java, and R.You can also run SQL like Hive queries in Spark SQL. I would be using Spark 2.4 with Scala for the purpose of this introduction. Now Spark has over 1300+ contributors and very good community support.
History of Spark: An UC Berkeley team was building Mesos which was a cluster management framework and they wanted to show how easy it was to build a framework from scratch in Mesos. That’s how Spark was born! More details in this Oreilly podcast with Ion Stoica about Spark’s origin story.
Why Spark?
- It is 1 stop solution for different varieties of Big data problems, a unified platform for batch, real-time(almost), machine learning, deep learning, and Graphs.
- You can run spark on existing Hadoop cluster with Yarn or Kubernetes or Mesos.
- Has something for everyone, Dataset for Scala fans, Spark SQL for people who have spent their last 10 years in Oracle, Teradata(Legacy systems), Data frames python pandas fans as well.
- Obviously, the speed which is backed by numerous benchmarks, charts, and all the good stuff.
Architecture
Spark has Master and Slave Architecture, I prefer to call it Master-Worker architecture(Not a big fan of the word slave). Master is the instance that hosts the Driver Program and the Worker is the instance that hosts executors. These can be hosted on the same Node(On your Mac) or different Nodes(EMR cluster with multiple EC2 instances). Basically, there is a driver, a worker, an executor, and the Cluster Manager.
Driver: The Spark context objects the main program is the driver program. It is a JVM process that is responsible for the execution of the tasks.
Worker: Workers are the instances where executors live to execute the user-written application code in the cluster.
Cluster manager: This is responsible for allocating resources across the spark application. The Spark context is capable of connecting to several types of cluster managers like Mesos, Yarn or Kubernetes apart from the Spark’s standalone cluster manager.
Executor: Its a JVM process launched for an application on a worker node, which runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
PS: Some assumptions: You are familiar with Scala and Sbt.
First thing is to add Spark dependencies in your build.sbt
.
name := "spark-examples"
version := "0.1"
scalaVersion := "2.12.8"
val sparkVersion = "2.4.0"
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.postgresql" % "postgresql" % "42.2.5",
// "com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.8.0" % "test"
//tests
"MrPowers" % "spark-fast-tests" % "0.17.1-s_2.12"
)
You can get started by creating the Spark Context,
import org.apache.spark.sql.SparkSessionobject Runner {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("SparkSessionExample")
.master("local[4]") // To runn locally; use 4 cores
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.getOrCreate
}
}
Spark API
There are 3 main abstractions in Apache Spark:
1) RDD: This is the most basic data abstraction in Spark, short for resilient distributed dataset. It is a fault-tolerant collection of elements that can be operated on in parallel.RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions. There are 2 ways of creating RDD, either by parallelizing an existing collection in the driver program or by reading data from an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
To create an RDD,
val sc = spark.sparkContext
val myRdd: RDD[(String, Int)] = sc.parallelize(
Seq(
("Jon", 1),
("Tyrion", 2),
("Bronn", 3)
)
)
SparkContext is a Scala implementation entry point. SparkContext represents the connection to a Spark cluster and can be used to create RDDs, accumulators and broadcast variables on that cluster.
Note: RDD is more like a low-level code now as the spark has evolved to much higher abstraction which gives better performance. It is still a good choice if we have to deal with unstructured data at the cost of some optimization and performance benefits.
2)Dataframe: It is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame but with lot more stuff under the hood. DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstractions. From 2.0, the Data frames API was merged with Dataset. Now a DataFrame is a Dataset organized into named columns.
type DataFrame = Dataset[Row]
Data frames can be created from RDDs.
val dfWithColumnNames: DataFrame = spark.createDataFrame(myRdd).toDF("name", "ids")
dfWithColumnNames.show(truncate = false)
+-------+---+
|name |id |
+-------+---+
|Jon |1 |
|Tyrion |2 |
|Gandalf|3 |
+-------+---+df.filter($"id" > 1).show()
+-------+---+
|name |id |
+-------+---+
|Tyrion |2 |
|Gandalf|3 |
+-------+---+import spark.implicits._
dfWithColumnNames.filter($"id" > 1).show()
Note: The import import spark.implicits._
required to use the $string
to a column type, to convert common Scala objects into Dataset.
Dataframe can be read from a variety of datasources such as csv, jdbc , json,parquet,s3, and many others.
spark.read.csv("users.txt")
The schema of the dataframe can be inferred:
dfWithColumnNames.printSchema()
root
| — name: string (nullable = true)
| — id: integer (nullable = false)
Also, SQL queries can be executed :
df.createOrReplaceTempView("people")
dfWithColumnNames.createOrReplaceTempView("got")
val sqlDF = spark.sql("SELECT * FROM got where name = 'Jon' ")
sqlDF.show()
+----+---+
|name| id|
+----+---+
| Jon| 1|
+----+---+
Apart from the handy DSL, there are a lot of hidden goodies such as Catalyst optimizer a Cost-based optimizer for spark and schema inference.
Dataframes also tries to solve a lot of performance issues that spark had with non-jvm languages like python and R . Historically, using RDD’s in python was much slower than in Scala. With Dataframes, code written all the languages perform the same with some exceptions.
3) Datasets: It is a collection of strongly-typed domain-specific objects that can be transformed in parallel using functional or relational operations. A logical plan is created and updated for each transformation and a final logical plan is converted to a physical plan when an action is invoked. Spark’s catalyst
optimizer optimizes the logical plan and generates a physical plan for efficient execution in a parallel and distributed manner. Further, there are Encoders generates optimized, lower memory footprint binary structure. Encoders know the schema of the records. This is how they offer significantly faster
serialization and deserialization (comparing to the default Java or Kryo serializers).
case class Characters(name: String, id: Int)//Note this class should be outside your main object.import spark.implicits._
val data = Seq(Characters("Jon", 21), Characters("Tyrion", 22), Characters("Gandalf", 19))
val ds = spark.createDataset(data)//To read data from a Datasource
val path: = "examples/src/main/resources/Characters.csv"
val peopleDS = spark.read.csv(path).as[Characters]
peopleDS.filter(_.id%2 ==0).show()val ds1 = dfWithColumnNames.as[Characters]
ds1.filter(_.name.startsWith("J")).show()
The main advantage of Datasets is Type safety. When using Datasets we are assured that both the syntax errors and Analysis errors are caught during compile time. In contrast with Dataframes, where a syntax error can be caught during compile time but an Analysis error such as referring to a nonexisting column name would be caught only once you run it. The run times can be quite expensive and also as a developer it would be nice to have compiler and IDE’s to do these jobs for you.
Datasets are built on spark SQL engine, it uses Catalyst to generate optimized logical and physical query plan and is a good trade-off Dataframes and RDDs. It has the performance of Dataframes and the flexibility RDDs to perform more fine grain operations such as lambda operations which are not possible with SQL like approach. Datasets give us the best of both worlds.
Also, there is an exciting feature in spark called Tungsten that understands the Dataset type JVM object and maps type-specific JVM objects to Tungsten’s internal memory representation using Encoders. Basically, tungsten stores data off java heap in a more efficient way as Spark has more information about the data that is being stored and tries to create cache-aware computations and also does dynamic code generation during runtime. There are a lot of benchmarks to back these performance enhancements and will get better as Spark is approaching 3.0. A lot of these new features were introduced in 2.0 and many of the features are marked experimental. But nevertheless, it is an exciting time to learn more about Spark.