Running Spark SQL CERN Queries 5x Faster on SnappyData

Pierce Lamb
7 min readMar 10, 2017

--

Following the improvements made to Spark SQL execution in SnappyData, we recently revisted a blog by the core Spark engineering team about billion row joins on Spark; we showed how our optimizations deliver ~20X speedups over Apache Spark for identical queries run on identical hardware. In this blog, we take a look at Luca Canali’s CERN blog which investigated the speed differences between Spark 1.6 and Spark 2.0 in executing a join with two conditions. The blog concluded that, based on the results, Spark 2.0 was 7X faster than Spark 1.6 which is pretty phenomenal considering that Spark 1.6 was no tortoise when it came to running SQL workloads.

The improvements in Spark 2.0 were primarily driven by whole stage code generation. The SnappyData team extended those innovations and also used techniques like colocation to further boost performance by up to 20X for Spark SQL queries running in SnappyData.

The original blog used a python program to create the tables and run the query. Being Scala hacks, we chose to reimplement the program in Scala (see below), populate the table with the exact same 10 million rows and re-run the test. We made sure that we used comparable machines in Microsoft Azure to run the tests. The results show that on a single machine with 16 cores, SnappyData can run the exact same queries 4–5 times faster than Spark 2.1. Note that we used the same class of machines that were used in the original blog.

Results

Performance times were averaged over 5 runs. The first test was on a single machine with a 16 core CPU, 2GB Driver Memory and Ubuntu and Java 8 installed. The 2nd test was on two machines, each with 8 core CPUs, 4GB Executor Memory, Ubuntu and Java 8 installed.

  • One Machine
  • Spark 2.1.0: ~14.4 minutes
  • SnappyData: ~2.9 minutes
  • Two Machines
  • Spark 2.1.0: ~12 minutes
  • SnappyData: ~3 minutes

The Benchmark

Below we show the Scala code used to run the single machine benchmark which is the same as the Python code used in the original blog. This code was run on a single machine with a 16 core CPU and 2GB of driver memory on Azure.

./spark-shell --driver-memory 2g       def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {        
for (i <- 1 to warmups) {
f
}
val startTime = System.nanoTime
for (i <- 1 to times) {
f
}
val endTime = System.nanoTime
val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
println(s"Average time taken in $name for $times runs: $timeTaken millis")
timeTaken
}
val snSession = new org.apache.spark.sql.SnappySession(sc) val test_numrows = 10000000snSession.range(0,test_numrows,1).createTempView("t0") snSession.dropTable("t1", ifExists = true) snSession.sql("Create table t1 using column options(partition_by 'bucket') as (select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0)") snSession.sql("select count(*) from t1").show() snSession.sql("describe t1").show() benchmark("Snappy Perf") {
snSession.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()
}

Following the original test code, we register ten million rows into a DataFrame. A column table is created in SnappyData using the values in the DataFrame as data. We then count the rows and show the schema has been created correctly. Finally we pass the actual test query into the benchmark function to determine how much time it takes. This test query is exactly the same as the original blog: it consists of a join with two conditions, an equality predicate on the column bucket and a non-equality condition. The query also contains an aggregation operation. As reported above, the result is a 4–5x improvement over Spark 2.1. The code used to test this query within Spark 2.1 can be found at the end of this blog along with the 2 machine code; it is exactly similar to the code used in the original Databricks blog.

So what is causing this improvement? Much like the original blog, the key to the performance boost is in the query plans for both Spark 2.1 and SnappyData. They are shown below

The Query Plans

ShuffleJoin in Spark involves moving data from one node to another to colocate data of a key on the same node. As one can imagine, shuffling is one of the biggest costs of data processing in Spark SQL, as it involves writing to disk, serializing data over network, multiple copying of data etc.

In certain cases, SnappyData can remove the shuffle operations altogether as shown in the above query plan. The LocalJoin operator exploits the knowledge that replicated data are present in all nodes and if any relation or intermediate result is present in all nodes, it never does a remote fetch or shuffle. It just scans through records of both the joined relations locally. Like with broadcast joins, LocalJoin never moves data across nodes.

LocalJoin performs a local hash join of two child relations. If a relation (out of a datasource) is already replicated across all nodes then rather than doing a Broadcast join, which can be expensive, this join just scans through the single partition of the replicated relation while streaming through the other relation.

The Exchange node in the SnappyData query plan has been avoided by giving the option ‘partition_by bucket’ while creating the table t1. This means the data is already partitioned while inserting the rows and the SnappyData query engine is aware of this fact. Hence, it avoids the expensive shuffle.

Because the Exchange node is avoided, all the nodes of query plan are part of a single generated code. In terms of WholeStageCodeGeneration, this is very performant.

SnappyHashAggregate. Alternate hash aggregation and hash join operators have been added that have been finely optimized for SnappyData’s storage to make full use of its storage layout which includes vectorization,dictionary encoding and more. These optimizations provide an order of magnitude performance advantage over Spark’s default implementations.

In short, we took the queries CERN used to test performance in Spark SQL 1.6 vs Spark SQL 2.0 and ran them on an equivalent machine in SnappyData. While CERN found a 7x improvement from Spark 1.6 to Spark 2.0, SnappyData delivered an additional 5x improvement over Spark 2.0 in performance.

SnappyData, which turns Apache Spark into a hybrid transactional analytics database that is fully compatible with Apache Spark, also continues to make performance improvements that speed up Spark SQL workloads significantly. Stay tuned for significant improvements to ingestion and concurrency in upcoming releases.

Appendix

Below we show the Scala code used to run the two machine benchmark.. This code was run on two machines with a 8 core CPU and 4GB of executor memory on Azure.

package org.apache.spark.examples.snappydata import java.io.{File, PrintWriter} 
import com.typesafe.config.Config
import org.apache.spark.sql.{SnappyJobValid, SnappyJobInvalid, SnappyJobValidation, SnappySession,SnappySQLJob}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.{Failure, Success, Try}
object CERNBlogImplementationEnhanced extends SnappySQLJob{ override def runSnappyJob(snc: SnappySession, jobConfig: Config): Any = {
def getCurrentDirectory = new java.io.File(".").getCanonicalPath
val pw = new PrintWriter("CERNBlogImplementationEnhanced.out")
Try {
def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {
for (i <- 1 to warmups) {
f
}
val startTime = System.nanoTime
for (i <- 1 to times) {
f
}
val endTime = System.nanoTime
val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
pw.println(s"Average time taken in $name for $times runs: $timeTaken millis")
timeTaken
val test_numrows = 10000000 //creating a temporary table t0
snc.dropTable("t0", ifExists = true)
snc.range(0,test_numrows,1).createTempView("t0")
snc.dropTable("t1", ifExists = true)
snc.sql("Create table t1 using column options(partition_by 'bucket') as (select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0)")
pw.println(snc.sql("select count(*) from t1").show())
pw.println(snc.sql("describe t1").show())
benchmark("Snappy Perf"){
pw.println(snc.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show)
}
} match {
case Success(v) => pw.close()
s"See ${getCurrentDirectory}/CERNBlogImplementationEnhanced.out"
case Failure(e) => pw.close();
throw e;
}
}
override def isValidJob(snc: SnappySession, config: Config): SnappyJobValidation = {
SnappyJobValid()
}
}

Commands for running the queries on the spark shell of Spark 2.1.0 on a single machine of 16 core CPU and 2 GB driver memory on Azure :

./spark-shell --driver-memory 2g def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {  
for (i <- 1 to warmups) {
f
}
val startTime = System.nanoTime
for (i <- 1 to times) {
f
}
val endTime = System.nanoTime
val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
println(s"Average time taken in $name for $times runs: $timeTaken millis")
timeTaken
}
val test_numrows = 10000000
spark.range(0,test_numrows,1).registerTempTable("t0")
spark.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")
spark.sql("select count(*) from t1").show()
spark.sql("desc t1").show()
benchmark("Spark Performance") {
spark.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()
}

Commands for running the queries on the spark shell of Spark 2.1.0 on two machines each of 8 core CPU and each executor node had 4 GB RAM on Azure :

./spark-shell --driver-memory 2g --master spark://IP:PORT def benchmark(name: String, times: Int = 5, warmups: Int = 2)(f: => Unit) : Double = {  
for (i <- 1 to warmups) {
f
}
val startTime = System.nanoTime
for (i <- 1 to times) {
f
}
val endTime = System.nanoTime
val timeTaken = (endTime - startTime).toDouble / (times * 1000000.0)
println(s"Average time taken in $name for $times runs: $timeTaken millis")
timeTaken
}
val test_numrows = 10000000 spark.range(0,test_numrows,1).registerTempTable("t0")
spark.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")
spark.sql("select count(*) from t1").show()
spark.sql("desc t1").show()
benchmark("Spark Performance") {
spark.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1<1000 group by a.bucket order by a.bucket").show()
}

A shout out to my colleague Sushma Goel for writing the scala code and producing the benchmark for the blog

SnappyData is open source on github
Learn more and chat about SnappyData on any of our community channels
Stackoverflow
Slack
Mailing List
Gitter
Reddit
JIRA

Other important links
SnappyData code example
SnappyData screencasts
SnappyData twitter
SnappyData technical paper

--

--