Spark DataFrames

Thejas Babu
4 min readApr 3, 2017

--

Spark SQL is a Spark module for structured data processing. With the recent changes in Spark 2.0, Spark SQL is now de facto the primary and feature-rich interface to Spark’s underlying in-memory distributed platform (hiding Spark Core’s RDDs behind higher-level abstractions). Spark SQL is the component of Spark Core that introduces the programming abstraction called DataFrame.

Dataframe and Spark Component

A DataFrame is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets with a schema. A DataFrame is a collection of rows with a schema that is a result of a structured query it describes. It uses the immutable, in-memory, resilient, distributed and parallel capabilities of RDD, and applies a structure called schema to the data. Key difference between DataFrames and RDDs are, when working with DataFrames you have much more information about the structure of the data. Information such as names of the column, the data types, etc are already known, which enables a whole bunch of optimization which won’t be possible in RDD.

DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. In Scala, DataFrames can be created using a Seq/RDD of tuples or case classes.

scala> val df = Seq((“Rey”, 23), (“John”, 44)).toDF(“Name”, “Age”)
df: org.apache.spark.sql.DataFrame = [Name: string, Age: int]
scala> df.show()
//Prints the data-frame in a tabular manner
+ — — + — -+
|Name|Age |
+ — — + — -+
| Rey| 23 |
|John| 44 |
+ — — + — -+
scala> case class Person(Name: String , Age: Int)
defined class Person
scala> val personDF = Seq(Person(“Ray”, 23), Person(“John”,44)).toDF
personDF: org.apache.spark.sql.DataFrame = [Name: string, Age: int]
scala> personDF.show()
+ — — + — -+
|Name|Age |
+ — — + — -+
| Ray| 23 |
|John| 44 |
+ — — + — -+

SqlContext has a number of createDataFrame methods that create a DataFrame given an RDD. For eg:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Creates a DataFrame from an RDD containing Rows using the given schema.

val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("John", 27),
Row("Eric", 25)
)
)
val schema = new StructType()
.add(StructField("Name", StringType, true))
.add(StructField("Age", IntegerType, true))
val df = sqlContext.createDataFrame(rowsRdd, schema)df.show()+------+----+
| Name|Age |
+------+----+
| John | 27 |
| Eric | 25 |
+------+----+

SqlContext also provides methods to create a DataFrame based on the content of a JSON file.

val df = sqlContext.read.json(“s3a://some-bucket/some-file.json”)

If using Spark 2.0 and above, SQLContext and HiveContext are merged as one object, called SparkSession. SparkSession is the entry point to Spark SQL and provides two useful Dataframe APIs

emptyDataFrame: DataFrame

emptyDataFrame creates an empty DataFrame (with no rows and columns).

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

createDataFrame creates a DataFrame using RDD[Row] and the input schema.

Spark DataFrame provides operations to filter, group, or compute aggregates, and can be used with Spark SQL.

df.filter(df.col("Age") > 26).show()+----+---+
|Name|Age|
+----+---+
|John| 27|
+----+---+
val newDf = df.unionAll(df)
// Set operations can be performed
newDf.show()+----+---+
|Name|Age|
+----+---+
|John| 27|
|Eric| 25|
|John| 27|
|Eric| 25|
+----+---+
newDf.groupBy("Name").count().show()
// people with same name
+----+-----+
|Name|count|
+----+-----+
|John| 2|
|Eric| 2|
+----+-----+
newDf.groupBy(df.col("Name")).sum("Age").show()
// sum(age) of people with same name
+----+--------+
|Name|sum(Age)|
+----+--------+
|John| 54|
|Eric| 50|
+----+--------+

Each DataFrame is internally represented as Logical Plan in spark. These Logical Plans are then converted into Physical Plan by the Catalyst Optimizer. Catalyst optimizer is the core of Spark SQL, which optimizes all the queries written both in Spark SQL and DataFrame DSL. Before any computation on a DataFrame starts, the Catalyst optimizer compiles the operations that were used to build the DataFrame into a physical plan for execution. Because the optimizer understands the semantics of operations and structure of the data, it can make intelligent decisions to speed up computation.

At a high level, there are two kinds of optimizations. First, Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data. Second, Catalyst compiles operations into physical plans for execution and generates JVM bytecode for those plans that is often more optimized than hand-written code.

Conversion of Logical plan to Physical plan by Catalyst Optimizer

In the above query, filter is used after the join which is a costly shuffle operation. The Catalyst Optimizer sees that and in physical plan, this filter is pushed to execute before join.

Another feature of Spark SQL which helps in huge performance improvement of DataFrame is Project Tungsten. Tungsten focuses on substantially improving the efficiency of memory and CPU for Spark applications, to push performance closer to the limits of modern hardware.

  1. Memory Management and Binary Processing: Project Tungsten aims at substantially reducing the usage of JVM objects (and therefore JVM garbage collection) by introducing its own off-heap binary memory management.
  2. Cache-aware computation: Cache-aware computation improves the speed of data processing through more effective use of L1/ L2/L3 CPU caches, as they are orders of magnitude faster than main memory. Cache-friendly algorithms and data structures are designed as part of Project Tungsten so that Spark applications will spend less time waiting to fetch data from memory and more time doing useful work.
  3. Code generation for Expression Evaluation: Expression evaluation is the process of computing the value of an expression (say “select a+a+a”) on a particular record. At runtime, Spark dynamically generates bytecode for evaluating these expressions, rather than stepping through a slower interpreter for each row.

--

--