EXPEDIA GROUP TECHNOLOGY — DATA

Indexing Spark Data with Microsoft’s Hyperspace

How to support ad hoc queries on Hadoop data

Mingwei Li
Expedia Group Technology

--

Photograph looking upwards through ornamental lights in a building
Image: “Sometimes you just have to look up” by Joshua Sortino on Unsplash

Data warehouses built on top of Spark and columnar stores often don’t perform ad hoc queries, ranges, or even joins well due to their OLAP-oriented nature. Hyperspace by Microsoft, an indexing subsystem built on top of Apache Spark, allows you to create indexes to support ad hoc queries just like a traditional database.

Decorative separator

What is Hyperspace?

Hyperspace is a simple set of APIs in the Spark programming model that lets you easily create and manage indexes on your existing DataFrame. It injects its faster execution into Spark's original execution plan to fully utilize the performance boost provided by indexes.

Its relationship with Spark can be illustrated like this:

Diagram of Spark runtime depending on original execution plan and hyperspace execution plan, which depend on data and index

Let’s run a demo

As of this writing, Hyperspace supports Apache Spark 2.4, with Scala versions of 2.11 and 2.12. In this demo, I will use Spark 2.4.6, with Scala version of 2.11.12 (Java 1.8.0_222).

1. Set up everything locally

Because Hyperspace is not really production ready yet and only supports HDFS-based index creation now. Therefore, when we run it in a local environment, HDFS running at localhost:9000 is needed so that Hyperspace can function properly.

1.1 Download and run a local HDFS

Go to the official Hadoop download page and download Hadoop 2.9.2. Extract the tarball in your preferred location. Edit file $HADOOP_HOME/etc/hadoop/hdfs-site.xml and make it look like this:

Then in the$HADOOP_HOME/bin directory, run:

./hdfs namenode -format

to format your HDFS for the first time before use.

Go to $HADOOP_HOME/sbin directory and execute start-dfs.sh. When prompted with SSH questions, make sure that you have all the permissions and SSH keys set up in order to allow Hadoop to SSH into your local machine to act like a pseudo-distributed HDFS. As a result, there should be three Java processes running to act as NameNode, DataNode and SecondaryNameNode respectively.

You’ve now set up your local HDFS!

1.2 Create a CSV sample file

Create a CSV file like this:

and put it into your local HDFS by running:

$HADOOP_HOME/bin/hdfs dfs -put /path/to/csv/file.csv /hyperspace_test

where /hyperspace_test is the destination directory in HDFS.

2. Run and test hyperspace

2.1 Start your Spark shell

To include Hyperspace as a dependency, run:

$SPARK_HOME/bin/spark-shell \
--packages=com.microsoft.hyperspace:hyperspace-core_2.11:0.1.0

Please choose different versions and packages if you are running with a different Scala version (2.11 or 2.12).

2.2 Load data and create an index

To load data, run:

Import Hyperspace into the Spark shell:

Next, we want to create an index on column id, which includes a data column called name, so that the name column can be retrieved quickly using the id:

This will create an index on the column id, and show index information like this:

Next, let’s take advantage of the index that we just created and see how it can change its execution plan and boost performance. Write the query as you normally would:

val query = df.filter(df("id") === 1).select("name")

Use Hyperspace to explain how this query will be interpreted:

hs.explain(query, verbose = true)

It will generate output like:

You can clearly see that with Hyperspace, FileScan will read the index Parquet file instead of the original CSV file from HDFS. Although this small example isn't complex enough to show the big advantage from it, it is obvious that building an index from the original CSV file and saving it into a sorted and managed Parquet file based on the id column will bypass the shuffle phase, and therefore increase the performance dramatically.

Finally, let’s enable Hyperspace and execute the query:

Let’s run query.explain() to see what's executed under the hood:

The physical plan is rewired by Hyperspace. During the physical plan execution, index created is being scanned instead. Compared to the original CSV, the index parquet file is pre sorted by column id by Hyperspace. Therefore, lookup query like WHERE id = 123 is faster during execution by hitting the index directly.

Besides this core API functionality, Hyperspace also includes index management APIs like:

When to use Hyperspace

If you often have queries that:

  • Look up a specific value (WHERE col = 'abcd')
  • Narrow the data into a very small range (WHERE num > 4 AND num < 7)
  • Or join between two tables on a common column (JOIN table2 ON table1.value = table2.value)

then you can definitely create indexes on top of those needed columns to speed up your queries.

Summary

Hyperspace builds indexes for your specified columns to bypass the distributed shuffle sort phase at runtime, and therefore boost your query performance. It is still under development, so please use it with caution in your production deployment.

http://lifeatexpediagroup.com/

--

--