quill-spark: A type-safe Scala API for Spark SQL


Since its initial release, Spark has had multiple iterations of its APIs to enable optimization of the job execution. This push to achieve better performance and efficiency came at the cost of a less intuitive and less type-safe API, which is not ideal for the Scala community since one of the most essential characteristics of the language is type safety.

This post explores the reasons for the design decisions that led to the Dataset API and presents a new solution using the quill-spark project, which provides a language-integrated query API for interacting with Spark’s SQL engine.


A brief history of Spark APIs

2014Matei Zaharia and his colleagues at UC Berkley published a paper introducing a new abstraction called Resilient Distributed Datasets (RDDs). It was the start of a new data processing solution that addressed some of the pain points of MapReduce. RDDs could be cached in-memory, which is a massive win for iterative algorithms, and had many implementation strategies that allowed for better fault tolerance, data locality, and efficiency in general.

The API for RDDs is type-safe and similar to other solutions earlier introduced like Twitter’s Scalding (2012). For example, this is a transformation that, given an RDD of tweets, returns their top hashtags:

Transformations return well-typed RDDs and are defined using regular Scala functions. Even though the API is reasonably good regarding usability, it has downsides:

  1. The user could make the mistake of doing reduceByKey before filtering the words (_.startsWith("#")), making the transformation much more expensive. The inefficiency is evident for this case, but there are many more subtle scenarios where the user could introduce inefficiencies.
  2. The execution engine can’t introspect Scala functions. The flatMap transformation _.text.split("\\s+") uses only the text column from the tweet object. If the tweet information comes from a columnar format like parquet, it would be more efficient to load only that column from storage. Spark can’t do anything other than loading the full object and then calling the Scala function with it because_.text.split("\\s+") is opaque.
  3. The API mixes lazy transformations that return RDD instances like flatMap and filter with strict actions like top that trigger the RDD execution and bring the data to memory. It’s common to see beginners using actions without understanding its implications.
  4. Functions can capture values from the outer scope (closure), eventually failing at runtime if a value is not serializable. This is an issue that the spores project tries to address.

2015The Spark community introduced an API called Dataframe and a new execution engine based on SQL to support it. It addressed some of the optimization limitations of RDDs by making transformations based on untyped string values to represent columns and expressions:

With this approach, the computation is less opaque to the execution engine. For instance, if the tweets are loaded from parquet, Spark knows that only the column 'text is used and can avoid loading the rest of the fields. Also, the user’s intent is more evident since the transformation uses first-class operations like group by and the count aggregation.

2016The Spark community made a new attempt at making the API more type-safe while keeping some of the benefits of Dataframe. The result was the Dataset API, which mixes RDD-like operations with Dataframe-like operations:

The API switches between Dataset and Dataframe depending on how the operation is done. In the current Spark version, Dataframe is just an untyped Dataset. This approach still has some problems:

1. Some of the transformations (flatMap, filter, map) use opaque Scala functions, so they don’t enable more advanced optimizations.

2. At a lower degree, Dataset brings back the problems with the possibility of inefficient usage of the API. For instance, if the user forgets to select the text column at the beginning, Spark will have to load the entire tweet to apply the flatMap(_.text.split("\\s+")) transformation.

3. The untyped operations make the code unsafe, prone to runtime type errors, and harder understand.


Two birds, one stone

Dataset mixes transformations using string values that lack type safety with typed transformations using opaque Scala functions that don’t enable advanced optimizations. An ideal solution should give the execution engine enough information about the transformations but at the same time preserve type safety.

In 2015, I came across a fascinating paper from Philip Wadler and few of his colleagues called “Everything old is new again: Quoted Domain Specific Languages”. I decided to start working on an implementation of it in Scala called Quill as a personal open source project. Basically, the idea is to allow users to express queries as collection operations in the host language and execute them using a target language.

This is actually an old idea. The first solution dates from the late ‘90s with the paper “Kleisli, a Functional Query System”. It was also used by the Links programming language, and later on popularized by Microsoft LINQ. The paper “Language-integrated query using comprehension syntax: state of the art, open problems, and work in progress” has an overview some of the available solutions in this space.

Quill was released in 2015 and since then the maintainer’s community has been continually evolving it, reaching the 2.2.0 version with Spark SQL support recently.

Quill is able to transform collection-like code into SQL queries without any special mapping, using regular case classes and Scala functions.

It goes even further and generates the final SQL string at compile time if the query has a static structure. This is why Quill is a Compile-time LINQ. Given that the query is known at compile-time, the user can also configure Quill to probe the statement against the database and fail the compilation if the query is invalid.

The mechanism that allows Quill to transform regular Scala code into queries using a target language is called Quotation. When quote is called, its parameter, instead of being executed as regular Scala code, becomes a parse tree that is transformed into an internal AST. testDB.run reads the internal AST information from the quotation, normalizes it, and transforms it into the SQL statement.

For more details on Quill, watch my Scaladays 2016 talk.


quill-spark

I’ve been working on a new module that integrates Quill with Spark’s SQL engine, which is the underlying engine of the Dataset API. For example, this is the version of the top hashtags transformation using it:

The run method uses implicit quotation so its parameter becomes a parse tree that Quill converts to a SQL statement to be executed by Spark SQL. Run only applies the query and returns a new Dataset, so it is a lazy operation.

This is the generated SQL statement to be executed by Spark:

This new API has a few advantages over Dataset:

  1. The computation is defined using regular code, leveraging Scala’s type system and syntax.
  2. Given that the computation becomes a SQL string, Spark is able to introspect all transformations and can apply more optimizations.
  3. The execution engine can reorder operations if necessary to avoid inefficiencies introduced by the user.
  4. All transformations are lazy, so the user doesn’t need to worry whether a method is lazy or not.
  5. Any captured value needs to be lifted into the quotation using the lift method, or else the compilation fails. Quill also ensures that only supported values can be lifted, so there’s no risk of runtime exceptions.

A monadic API

Most of my professional experience is working on backend systems, but recently I decided to change teams at Twitter to learn a new domain and started working on data processing solutions.

One of the things that intrigued me initially was the flatMap method that Spark and Scalding provide. They receive a function that returns a TraversableOnce:

Considering that flatMap in Scala normally implements the bind operation of a monad, I’d expect something like this:

But that’s not the case. These APIs are actually applicative, not monadic. The flatMap function is something like a concatMap or unnestMap that explodes each value of the collection into multiple values and then unnests the result.

On the other hand, Quill’s Query is a monadic interface. This characteristic makes it more natural for users to combine multiple data sources since it’s an API similar to Scala collections.

For instance, given two datasets people and couples, it’s possible to use a for-comprehension to find same-age couples:

That’s why the Quill version of the top hashtags uses concatMap instead of flatMap to transform tweet texts into words.


Conclusion

This new Quill module seems to be a promising solution for data processing in Scala. Even though it’s based on a mature library for database access, the integration is young and probably has limitations. If you are interested, give it a try and let us know the results.

To get started, refer to the project website: http://getquill.io/#contexts-spark-context. Please use the Gitter channel for questions and report issues on Github.

Special thanks to the other maintainers for the code reviews of this new module and for making Quill an awesome project to work on!