Spark Cost Based Optimizer

With the release of Spark 2.2.0, a new feature was introduced in Spark SQL which is very popular in the RDBMS world - Cost Based Optimizer aka CBO. Prior to Spark 2.2.0, the data processing was based on the Rule based Optimizer. A rule-based optimizer is a set of heuristics rules which ignore the typology of the data.

What is Cost Based Optimizer?
Cost based optimizer is an optimization rule engine which selects the cheapest execution plan for a query based on various table statistics. CBO tries to optimize the execution of the query with respect to CPU utilization and I/O, thus returning as quickly as possible.

CBO in Spark
Spark introduced CBO with 2.2.0 release. By default, CBO is disabled and can be enabled using property spark.sql.cbo.enabled :

spark.conf.set(“spark.sql.cbo.enabled”,”true”)

For CBO to work effectively spark collects different table statistics which it uses to calculate the cost of the various logical plan. Statistics of a table is collected using:

ANALYZE TABLE <TABLE_NAME> COMPUTE STATISTICS

To collect statistics for a particular column of a table use:

ANALYZE TABLE <TABLE_NAME> COMPUTE STATISTICS FOR COLUMNS <COLUMN_NAME_1>, <COLUMN_NAME_2>

Let's verify if the CBO is working properly with a random query which performs a cross join on people table :

val query = “select * from people a, people b where a.location=b.location and a.country = ‘Serbia’” 
val stats = spark.sql(“select * from people a, people b where a.location=b.location and a.country = ‘Serbia’”).queryExecution.toStringWithStats
scala> println(stats)
== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter ((‘a.location = ‘b.location) && (‘a.country = Serbia))
+- ‘Join Inner
:- ‘SubqueryAlias a
: +- ‘UnresolvedRelation `people`
+- ‘SubqueryAlias b
+- ‘UnresolvedRelation `people`

== Analyzed Logical Plan ==
Age: int, Country: string, CustomerID: int, CustomerName: string, CustomerSuffix: string, Location: string, Mobile: string, Occupation: string, Salary: int, Age: int, Country: string, CustomerID: int, CustomerName: string, CustomerSuffix: string, Location: string, Mobile: string, Occupation: string, Salary: int
Project [Age#10581, Country#10582, CustomerID#10583, CustomerName#10584, CustomerSuffix#10585, Location#10586, Mobile#10587, Occupation#10588, Salary#10589, Age#10618, Country#10619, CustomerID#10620, CustomerName#10621, CustomerSuffix#10622, Location#10623, Mobile#10624, Occupation#10625, Salary#10626]
+- Filter ((location#10586 = location#10623) && (country#10582 = Serbia))
+- Join Inner
:- SubqueryAlias a
: +- SubqueryAlias people
: +- Relation[Age#10581,Country#10582,CustomerID#10583,CustomerName#10584,CustomerSuffix#10585,Location#10586,Mobile#10587,Occupation#10588,Salary#10589] parquet
+- SubqueryAlias b
+- SubqueryAlias people
+- Relation[Age#10618,Country#10619,CustomerID#10620,CustomerName#10621,CustomerSuffix#10622,Location#10623,Mobile#10624,Occupation#10625,Salary#10626] parquet

== Optimized Logical Plan ==
Join Inner, (location#10586 = location#10623), Statistics(sizeInBytes=14.2 MB, rowCount=4.39E+4, hints=none)
:- Filter ((isnotnull(country#10582) && (country#10582 = Serbia)) && isnotnull(location#10586)), Statistics(sizeInBytes=702.3 KB, rowCount=4.13E+3, hints=none)
: +- Relation[Age#10581,Country#10582,CustomerID#10583,CustomerName#10584,CustomerSuffix#10585,Location#10586,Mobile#10587,Occupation#10588,Salary#10589] parquet, Statistics(sizeInBytes=23.6 MB, rowCount=1.00E+6, hints=none)
+- Filter isnotnull(location#10623), Statistics(sizeInBytes=165.9 MB, rowCount=1.00E+6, hints=none)
+- Relation[Age#10618,Country#10619,CustomerID#10620,CustomerName#10621,CustomerSuffix#10622,Location#10623,Mobile#10624,Occupation#10625,Salary#10626] parquet, Statistics(sizeInBytes=23.6 MB, rowCount=1.00E+6, hints=none)

== Physical Plan ==
*BroadcastHashJoin [location#10586], [location#10623], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[5, string, true]))
: +- *Project [Age#10581, Country#10582, CustomerID#10583, CustomerName#10584, CustomerSuffix#10585, Location#10586, Mobile#10587, Occupation#10588, Salary#10589]
: +- *Filter ((isnotnull(country#10582) && (country#10582 = Serbia)) && isnotnull(location#10586))
: +- *FileScan parquet default.people[Age#10581,Country#10582,CustomerID#10583,CustomerName#10584,CustomerSuffix#10585,Location#10586,Mobile#10587,Occupation#10588,Salary#10589] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark/bin/spark-warehouse/people], PartitionFilters: [], PushedFilters: [IsNotNull(Country), EqualTo(Country,Serbia), IsNotNull(Location)], ReadSchema: struct<Age:int,Country:string,CustomerID:int,CustomerName:string,CustomerSuffix:string,Location:s…
+- *Project [Age#10618, Country#10619, CustomerID#10620, CustomerName#10621, CustomerSuffix#10622, Location#10623, Mobile#10624, Occupation#10625, Salary#10626]
+- *Filter isnotnull(location#10623)
+- *FileScan parquet default.people[Age#10618,Country#10619,CustomerID#10620,CustomerName#10621,CustomerSuffix#10622,Location#10623,Mobile#10624,Occupation#10625,Salary#10626] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/spark/bin/spark-warehouse/people], PartitionFilters: [], PushedFilters: [IsNotNull(Location)], ReadSchema: struct<Age:int,Country:string,CustomerID:int,CustomerName:string,CustomerSuffix:string,Location:s…

In the above output, its the optimized logical plan is of the prime importance. If we closely look through details of the plan, will find the statistics for each operation of the query.

The Mathematics behind CBO

Now that we know how to enable CBO and how to leverage CBO for better performance, let's try to understand the mathematics behind the CBO.

The metrics that Spark CBO collects for it’s efficient working can be classified into two categories: Table Level Metrics and Columns Level Metrics. Table level metrics include total no of rows and table size in bytes. Column-level metrics include the distinct count, max, min, null count and histogram ( describing the distribution of the values of the column).

To collect histogram statistics we need to enable following property ( available from spark version 2.3):

spark.sql.statistics.histogram.enabled

FILTER Operation:

  1. Single logical operations can be “<”, “< =”, “>”, “> =” or “=”.
  2. In Equal operation (“=”), the filter selectivity checks the literal constant value for two conditions : 
     a. If the constant value is present within the qualified range of the min and max value. If it’s outside the range then the value is simply zero for it.
     b. The number of distinct values present for that column.
  3. In case of Less than (“<”) operation, the filter selectivity checks the literal constant value for the following conditions:
    a. If the constant value is less than the min value then it is zero.
    b. If the constant value is greater than the max value then it is 1.
    c. If the constant value lies between the min and max then assuming a uniform distribution, filter selectivity is based on (constant — min value)/ (max value — min value)
    d. If histograms are enabled, then filter selectivity is calculated by adding the density of bucket between the minimum value and constant literal value.

Similarly, logical operators like greater than (“>”) and not equal (“=”) can be computed.

JOIN Cardinality:

Join cardinality computation is based on the cardinality of the original tables and the distinct values present in the column on which join is performed.

  1. Inner-join: In case of the inner join, cardinality is computed as the product of the number of rows in table A and B divided by the max of the distinct count of column k in both the tables. Column k is the column on which join is performed.
  2. Left-outer joins: In case of left outer join, max of the rows returned in case of inner join and number of rows present in table A.
  3. Right-outer joins: In case of right outer, max of the rows returned in case of inner join and number of rows present in table B.

Cost Plan:

Last and final part of the piece is how does spark decides which plan execute. Spark uses the statistics computed for the different operation to compute the cost of a query plan. The cost generally has two parts :
 a. CPU cost
 b. I/O cost

In spark, this is calculated using the following formula:

weight * cardinality + (1 — weight)*size

here, weight is a tuning parameter and can be configured via spark.sql.cbo.joinReorder.card.weight . By default, its value is 0.7.

Things To Take Care About:

CBO does not work properly with partitioned hive tables have underlying storage as parquet. CBO only gives the size and not the no of the rows estimated.

More details can be found at:
Mastering Spark SQL by Jacek Laskowski