Apache Spark — Catalyst Deep Dive

Adi Polak
Microsoft Azure
Published in
5 min readNov 13, 2018

--

Today we live in a world of ever growing technologies and immense data collection. It brings us the opportunity to create amazing products which usually require real time(RT) or near real time (NRT) data processing, with processing time limits as small as a few minutes or seconds.

With great power (demand?) comes great responsibility !

.. or just a few developers smashing their heads together hoping to find solutions for real time data processing challenges.

And I am one of them.

After giving it some time, I decided that it would be wiser to first fully understand how the frameworks work. So I did a deep dive to learn more about Big Data technologies.

Deep dive

What is deep dive and how can I do it too?

Deep dive for me is first understanding the hierarchy of the components in the framework, understanding the structure of the code itself, the different libraries, what belongs where and later on, how it influences the data processing time.

Many Big Data technologies are open source, and you can easily find their open source code online. I have decided to start with Spark mainly since I had the most experience with its APIs.

First, I used this command to clone the Spark repository:

git clone git@github.com:apache/spark.git -v 2.3.0

Then, I opened the code up in IntelliJ (my preferable IDE for developing in Java, Scala or Kotlin) and started my first dive.

Since Apache Spark is an open source project, the team is highly invested in style and making sure the code is readable so people can engage and support the framework.

I was curious about two of the main libraries sql (which contains the catalyst Spark query optimizer) and streaming.

So I combined it with my knowledge of the architecture and flow of Spark query. Here’s what a basic Spark architecture looks like:

Basic Spark architecture

And here’s the catalyst flow:

Spark query flow — catalyst

Catalyst theory

The Catalyst optimizer is at the core of Spark SQL and is implemented in Scala. It enables several key features, such as schema inference (from JSON data), that are very useful in data analysis work. The previous figure shows the high-level transformation process from a developer’s program containing DataFrames/Datasets to the final execution plan.

The internal representation of the program is a query plan. The query plan describes data operations such as aggregate, join, and filter, which match what is defined in your query. These operations generate a new Dataset from the input Dataset. After we have an initial version of the query plan ready, the Catalyst optimizer will apply a series of transformations to convert it to an optimized query plan. Finally, the Spark SQL code generation mechanism translates the optimized query plan into a DAG (directed acyclic graph) of RDDs (Resilient Distributed Datasets) that is ready for execution. The query plans and the optimized query plans are internally represented as trees. So, at its core, the Catalyst optimizer contains a general library for representing trees and applying rules to manipulate them. On top of this library, are several other libraries that are more specific to relational query processing. Catalyst has two types of query plans: Logical and Physical Plans. The Logical Plan describes the computations on the Datasets without defining how to carry out the specific computations. Typically, the Logical Plan generates a list of attributes or columns as output under a set of constraints on the generated rows. The Physical Plan describes the computations on Datasets with specific definitions on how to execute them (it is executable).

Let’s explore the transformation steps in more detail. The initial query plan is essentially an unresolved Logical Plan, that is, we don’t know the source of the Datasets or the columns (contained in the Dataset) at this stage. We also don’t know the types of columns. The first step in this pipeline is the analysis step. During analysis, the catalog information is used to convert the unresolved Logical Plan to a resolved Logical Plan.

In the next step, a set of logical optimization rules is applied to the resolved Logical Plan, resulting in an optimized Logical Plan. In the next step the optimizer may generate multiple Physical Plans and compare their costs to pick the best one. The first version of the Costbased Optimizer (CBO), built on top of Spark SQL has been released in Spark 2.2.

All three–DataFrame, Dataset and SQL–share the same optimization pipeline as illustrated in the following figure.

Catalyst code

Now that we know the theory, lets look at the code:

This is the catalyst hierarchy in Spark core.

Lets take as an example the Analysis phase — from the photo you can see where the analysis library is located in the code structure — the first library in the catalyst library:

When going further and digging into the code, you will find the test suites.

One of the ways I found is easier to understand the component and the phase was to run the tests in debug mode, and check the inner flow.

AnalysisSuite

Later on I discovered some really cool features like tree pruning optimization for the query tree and others that helped me understand how to write better queries.

In a future article, I will show examples of Spark UDF, how it influences the runtime and what we can do to improve it!

Follow me on Medium for more posts about Scala, Kotlin, Big Data, clean code and software engineer nonsense.

--

--

Adi Polak
Microsoft Azure

👩‍💻 Software Engineer 📚 Author of Scaling Machine Learning with Spark (O'Reilly) 🗣️ Keynote Speaker 💫 Databricks ambassador