SparkSQL query optimization

Pipitz
7 min readAug 14, 2022

--

Spark, in recent years, has become the go-to distributed computation framework for a lot of different use cases. From only providing map-reduce funtionalities, it has introduced other modules: from machine learning, to graph data, to SQL.

Today we will focus on SparkSQL and, as you can imagine given the title of this post, its query optimization procedures. Most of what we will see here is reported in the SparkSQL paper (sigmod_spark_sql.pdf (mit.edu)) and in SparkSQL official documentation (Performance Tuning — Spark 3.3.0 Documentation (apache.org)), as well as in my MSc thesis (http://hdl.handle.net/10589/186065).

In order to fully understand the contents of this post a basic knowledge of the principles behind Spark is advised (my post on Spark’s core concepts can be found here).

The Catalyst

sigmod_spark_sql.pdf (mit.edu), Catalyst optimization procedure

The Catalyst is responsible for optimizing SQL queries. The optimization process goes through multiple steps:

  • Query abstraction into an AST
  • Logical optimization
  • Physical optimization
  • Code generation

In the following we will take a closer look at each step, in order to better understand what is going on under the hood when we execute a SQL query.

It is important to remark that Spark will evaluate the queries in a lazy fashion: as long as no action is called the transformations will not be executed.

Query abstraction into AST

The first step in query optimization is abstracting from the language used to write the query (SQL, Scala, Python,…) to a tree representation.

This procedure starts by considering the query and the datasets: any unresolved attribute found will then pass through the catalog (an entity dictionary) in order to be resolved. Once all attributes will have been resolved, the Abstract Syntax Tree representation is obtained.

In an AST each node represents either data, therefore a RDD, or an operation on one or more RDD. As an example, consider the expression x + (1+2), whose representation can be found in the image below.

sigmod_spark_sql.pdf (mit.edu), AST for x + (1+2)

A relevant effect of this abstraction is the decoupling between the language used to write the query and the query that the Catalyst will optimize. As a result, the query that will be executed won’t be affected in any measure by the language we have chosen to write it. This, however, holds only in a situation in which we are not making use of User Defined Function. If there is the usage of UDFs the language used becomes relevant performance-wise. As a rule of thumb, scala UDFs will be quicker than Python UDFs, unless Arrow is used. If Arrow is used, python UDFs will be faster.

Logical Optimization

Once the AST representation is obtained, the Catalyst will transform that AST applying a set of rules iteratively, until no more rules can be applied. The rule set has been defined by the programmers behind Spark, however it can be expanded as needed in a relatively easy fashion. An example of rule that can be applied in this phase is the filter pushdown: if a filtering transformation is found it will be pushed as close to the data source as possible. This way less data will be taken from storage to memory and, therefore, less data will have to be elaborated, lightening the workload.

An interesting thing to point out is how well this step and the lazy evaluation of transformations interact: were the evaluation eager some optimization opportunities might be missed. Let’s consider a query that joins two dataframes and then filters the result:

dfA.join(dfB, dfA.field===dfB.field,“INNER”).filter(A_filtering_field==True)

With an eager evaluation first the join would be executed, then the result will be filtered, therefore wasting resources on data that will be filtered out. With a lazy evaluation, instead, data will first be filtered and then joined, thus saving resources.

Another interesting thing to point out is that with parquet files the filter can be pushed down to the data before it is even serialized into memory, thus reading only what satisfies the filter.

Physical Optimization

The physical optimization phase further optimizes the workload by taking the optimized AST as input, generating multiple physical plans, running them through a cost model an selecting the least expensive one.

On a practical level, this phase is mostly responsible for selecting the join strategy for each join present in the query. As it can be seen in the image below, Spark uses a pretty complicated flowchart for selecting the join strategy. For the sake of brevity, and given that we will consider them when talking about dynamic optimization, we will only discuss the Broadcast Hash Join and the Sort Merge Join strategies.

http://hdl.handle.net/10589/186065, Spark flowchart for joins

- Broadcast Hash Join: if the join is an equi join and a table on the inner side of the join is “small enough” then the join is performed by broadcasting said smaller table among the Workers and the Driver. How much is ‘enough’ is, by default, 10MiB, but can be defined by the user. However be careful: if the threshold is set too high the performances will take a hit and if it exceeds the available memory on the Driver Spark will crash.

- Sort Merge Join: if the join is an equi-join and the tables can be sorted by the join key, while not being possible to broadcast them, the Sort Merge Join strategy will be chosen (by default the Shuffle Hash Join strategy is disabled). The join will be performed by sorting the tables by the join key, joining the corresponding table sections and merging the results.

There is no such thing as a “best join strategy”, as all of them have pros and cons. There is, however, a “best join strategy for this particular situation”, this step of the optimization procedure takes care of identifying that strategy. While it is possible for the user to interfere with this process via hints, I strongly argue against it. Using hints will make the optimization process less flexible, thus potentially missing optimal solutions in favor of suboptimal ones.

Code Generation

Once the optimal physical plan has been chosen, the Catalyst will produce the code that will actually be used to interact with RDDs and execute the workload.

This phase can be seen as a stack: each transformation and action part of the workload will be sequentially added to the stack. Once all transformations and actions have been added, the code will be generated starting from the last one and then going backwards.

An interesting downside of this approach is tied to the language used to write Spark, Scala. Code generation is implemented via a recursion (note: not a tail recursion, just a plain recursion): as such, at each recursive invocation a new frame has to be allocated on the stack. Scala can allocate a finite number of frames. As a result, trying to optimize an extremely long query might result in the Driver crashing. While this occurance is pretty rare, it is still possible.

Dynamic Optimization — AQE

It has to be noted that, this far, we only have talked about static optimization: the query is only optimized once before execution. As such, the query plan cannot be adapted during execution. Let’s consider the following example

  • Table A is joined with B, obtaining AB
  • AB goes through an highly filtering statement
  • AB is joined to C

While the filtering of AB could lead to the possibility to use a Broadcast Hash Join strategy the Catalyst is blind to it, since it cannot predict the dimension of AB.

Adaptive Query Execution was introduced for situations like the one mentioned above. As of Spark 3.3.0, AQE has four main capabilities:

  • Automatically coalescing post-shuffle partitions
  • Converting Sort Merge Joins into Broadcast Hash Joins
  • Converting Sort Merge Joins into Shuffle Hash Joins (as the Shuffle Hash Join is disabled by default I will not talk about this property in this post)
  • Optimizing skewed joins

The above is done by sending new data to the Driver during execution, specifically during shuffle phases, and reingaging the Catalyst.

I have executed a series of test (http://hdl.handle.net/10589/186065) over AQE capabilities. While these tests have been executed on an older version (3.1.2), they still hold:

  • AQE does coalesce post shuffle partitions
  • AQE is able to recognize skewed joins and correct them when the join strategy is a Sort Merge Join
  • AQE can transform a Sort Merge Join into a Broadcast Hash join if and only if there is a shuffle phase between the filtering action and the subsequent join

While these are the main findings, there were also a whole more, which I will not discuss here, given that I have already written my thesis once ;) .

Conclusions and Sandbox

In this post we have taken a look at how Spark optimizes SQL queries via the Catalyst, following along each step of the optimization procedure. We also have taken a brief look at AQE and how it works.

In order to have a practical example of what we have seen today feel free to go to this GitHub repo and play around with the notebook and the data.

Cheers, see you next post.

--

--