Simple queries in Spark Catalyst optimisation (1)

In Spark 1.6, the Spark SQL catalyst optimisation get very mature. With all the power of Catalyst, we are trying to use the Data frame (Dataset) transformations in our all Spark jobs. But do we real get a lot of benefits from using Data frame even just doing a simple “select filter” query? Where is come from all this performance benefit even from a simple “left join” or “aggregation count”. With all this questions, I have dug in a little the Spark source code to better understand this “catalyst”, to see how it work.

The Catalyst optimisation consist 4 stages, this DataBrick blog https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html is very good to explain the principles. Because we focus on the performance improvement of the simple SQL queries, the “Analysis” and “Logical optimisation” stages could not help our query too much. But this two stages contains hundreds of rules, and should help a lot of for optimise the long and complicated queries. We are much more interesting to look at the “Physical plan” stage and see what they are doing. Below is the source code of QueryExecution class in Spark:

The entry logicPlan on the constructor is come from the parsing of SQL query. QueryExecution create first the analyzed(line 5) and optimizedPlan (line 12), then create the sparkPlan. The line 7, withCachedData() will transform the exist LogicalPlan to InMemoryRelation if you have cached your DF. All the physical plan in Spark are inherited from the class SparkPlan Which I think it’s a good name, in fact all the “Physical plan” is about to implement the SQL queries in a “Spark rdd transformation way”. That’s why doExecute(): RDD[InternalRow] is the abstract method for SparkPlan. To check all these “plans” in your Spark job. You can call explain() function of your Data frame, or look at Spark UI SQL tab.

Suppose we have this simple DataFrame code.

The physical plan for this DF on version not cached is:

== Physical Plan ==
Filter (key#0 > 5)
+- Scan ExistingRDD[key#0,value#1]

We have 2 SparkPlan here: Filter and PhysicalRDD.

PhysicalRdd is the child of Filter(it print out “Scan ExistingRDD”), and SparkPlan Filter has a GreaterThan expression, PhysicalRDD has 2 attributes(2 columns for output). PhysicalRdd plan is the end of the SparkPlan tree but it’s also the start point of the execution, his doExecute() method just return the original InternalRow rdd. And the Filter’s doExecute() apply the predicate function on the Iterator from his child rdd’s mapPartitionsInternal().

The newPredicate() will generate the Java byte code of a Predicate class based on the expression of Filter. But it will not go faster compared to using rdd.filter() with a Scala function.

In the end, we can see, for a “select filter” style query, using DataFrame don’t have any benefit on performance without cache. But when you cache the DataFrame, thing go much more interesting.

Version cached:

== Physical Plan ==
InMemoryColumnarTableScan [key#0,value#1], InMemoryRelation [key#0,value#1], true, 10000, StorageLevel(true, true, false, true, 1), ConvertToUnsafe, None

This final physical plan display only one line (above), but it actually did a lot works. In fact the execution still include Filter SparkPlan, but the last SparkPlan executed is IMCTS.

The start point is ConvertToUnsafe which is a SparkPlan. It transform the original InternalRow rdd to UnsafeRow rdd. An Unsafe implementation of Row which is backed by raw memory instead of Java objects .This change is part of the Tungsten project to improve the memory management for Java objects. Because we only do a filter operation on data, UnsafeRow will not help our performance, but actually make our data save more efficiently in the memory, and this reduce the GC.

InMemoryRelation is just a case class which contain the data in CachedBatch format. Inside it, the most important method is buildBuffers() which transform the rdd of InternalRow (UnsafeRow is sub-class of InternalRow) to the rdd of CachedBatch. CachedBatch is a class basically contains a batch of rows, but also it store these rows in column format. For example, the row data stored inside of CachedBatch is on format of Array[Array[Byte]], the first Array is by column and inside it, the array present all the rows for that column. Finally each CachedBatch has a stats which contain some meta data like min, max values of this batch per columns. This stats (line 27)will help the operation like Filter based on the batch data late. (So you don’t need filter the data on each rows) The batchSize on line 12 is come from the property “spark.sql.inMemoryColumnarStorage.batchSize” with default value 10000. And the MAX_BATCH_SIZE_IN_BYTE is 4Mb.

InMemoryColumnarTableScan is a SparkPlan. His doExecute() method basically map a Iterator[CachedBatch] back to Iterator[InternalRow].

The partitionFilter on the line 2 is built based on the Predicates. Between the line 12 to 33, the big performance improvement is apply the predicates directly on the CatchBatchs based on the stats. If the rows in one partition is less than 10000, in the best case, the filter can skip scanning thewhole partition. And finally, on using GenerateColumnAccessor to generate Java byte codes of an Iterator[InternalRow].

In summary, Spark catalyst could not make your “select filter” query particular faster. But if you cache your data frame, the rows are saved in column format, then the filter operation should much faster than rdd. The second part of this blog try to explain where this improvement come from. In theory, the CachedBatch max rows size should impact the performance, if you put smaller size, more CachedBatch could be filter out entirely.

I plan to do the same “journey” for the simple Join query, should be much more interesting and exciting.