How we built a Universal Query Execution Engine using Zeppelin (Part 1)

Saksham Srivastava
MiQ Tech and Analytics
4 min readDec 11, 2017

A few months ago at Media iQ, we realized we had a unique problem to solve, to have the capability to run any Hive/Spark/SQL/ElasticSearch query programmatically through a unified interface. There were far too many platforms and interfaces used to execute the queries.

Few people were using workbenches to run PostgreSQL queries or ElasticSearch clients to execute ES queries. Data scientists were writing their queries in .sql files and executing them on Hive or Spark clusters. Debugging was hard and individual status were not available for the queries directly. Also having alerts and events based on query status were missing.

The Problem Statement

We wanted to build a system that allows a user to run a query in their choice of language (Hive/Spark etc) against any dataset, through a single interface.

Building such a system fits well into our overall ecosystem where each team is building microservices and exposing APIs for other teams to consume.

All the requirements narrowed down to building a universal query execution system which can:

  • Run a query on a given platform
  • Stop the query
  • Provide real-time status of the query execution
  • When completed, provide the query result
  • Support workflows, which are sequential list of queries
  • Expose APIs for all of the above

We decided to support multiple platforms like ElasticSearch, PostgreSQL, R, pySpark etc. to cater to all the stakeholders.

This is how our problem looked like:

Investigation

Over time, we did multiple POCs around various available solutions like HiveServer2 with JDBC, Spark Job Server, Apache Livy, Hue, WebHCat and Apache Zeppelin.

We finally choose Apache Zeppelin over others because:

  • Zeppelin supports multiple backend execution platforms like Spark, Hive, SQL, ElasticSearch, R, Python etc.
  • Zeppelin has REST APIs for common operations.
  • Zeppelin being an Apache FOSS project, has a helpful and lively community.
  • Zeppelin also gives an option to directly use the notebook for visualization.

Apache Zeppelin

Zeppelin is a web-based notebook that enables data-driven, interactive data analytics and collaborative documents.Zeppelin also exposes Rest APIs for most use cases like createNotebook, createParagraph, runNotebook, status, and stopNotebook.

Apache Zeppelin Architecture

There are two major components in Zeppelin that helped us solve our problem:

Zeppelin Notebook

  • Notebook is an interactive browser-based container of paragraphs.
  • A notebook can have multiple paragraphs of different languages.
  • Zeppelin provides support to export/import/clone notebooks and also to run/stop/clear all paragraphs in the notebook.

Interpreter

  • Zeppelin Interpreter is a plug-in which enables Zeppelin users to use a specific language/data-processing backend.
  • Currently, Zeppelin supports many interpreters such as Scala, Python, Spark SQL, JDBC, Markdown, Shell, ElasticSearch, Pig, Livy and so on.

Zeppelin has excellent documentation which can be accessed here

The Solution

Translating the request to Zeppelin

A sample query looks like this:

POST: http://queryservices.mediaiqdigital.com:8445/api/v1/query/submit
{
“text” : “select count(*) from emp_table”,
“query_type” : “sql”,
“cluster” : “ip address of cluster”
}

Every query is mapped to a unique paragraph in a notebook. We also map the text to the paragraph text. The query_type will determine the Zeppelin interpreter to be used.

Once the query is submitted to Query Execution Service, we create a paragraph on Zeppelin using the createParagraph API of Zeppelin.

The same query mapped on Zeppelin looks like:

Using the paragraph-id, the status of the corresponding query can be determined from Zeppelin.

Architecture

We broke down the Query Execution Service into two microservices :

  1. Query Service, which is the user-facing interface and calls the APIs exposed by Async Remote Executor.
  2. Async Remote Executor (ARE), which hosts a client to call the Zeppelin APIs, execution logic and exposes 4 APIs (submit, stop and status, result)

Query Service

Any service which wishes to run the queries interact with this service, which in turn talks to the APIs exposed by ARE. It connects to MySQL database which it uses for persisting query lifecycle.

Asynchronous Remote Executor

The idea of this service is to abstract query execution from other life cycle events. It has a Zeppelin Client for interacting with Zeppelin, API implementations and Rest interface which exposes APIs for Query Service to consume.

Why we chose this design

  • This allowed us to decouple the execution backend with the user interaction.
  • Query lifecycle and states are handled and persisted by the Query Service alone.
  • ARE is an execution engine and does not have to take care of persistence and application logic around the query.
  • We can scale AREs depending on load.
  • In future, we can plug in AREs which are not tightly coupled with Zeppelin.

Technology choices

We use Vert.x as our underlying reactive framework and also to build the rest interface, Retrofit to build the client libraries.

Infrastructure

All our services are deployed in AWS and we use Zeppelin out-of-the-box installation in EMR.

Future Scope

We will add more capabilities to the system such as cluster management, support for additional backends and search and preview capabilities.

(We will be talking more about workflows in next part of this blog series)

--

--