Engineering Analytics API with HBase, Phoenix and SQL at Helpshift

Abhishek Gupta
helpshift-engineering
12 min readJun 17, 2019

Background

Helpshift provides near real-time data-driven analytics to its customers for their BI needs. It empowers them to track and measure the quality of the customer service swiftly by offering a Realtime Analytics Dashboard. Some of the KPI metrics provided include measures like Time to first response on a support ticket (TTFR), Customer satisfaction ratings (CSAT), Total time to resolve (TTR) and many more.

As we expand into enterprise markets where the businesses are heavily invested in their own BI tools such as Tableau, Qlik or some custom solution, it becomes essential to offer a generic programmatic method of pulling massive analytics datasets, over a simple medium like REST APIs. That is where Helpshift Analytics APIs come in.

In this article, we describe our journey of engineering the Analytics APIs using Apache HBase + Phoenix, outline its architecture, and discuss how we designed a big query platform to power the Analytics APIs by embracing the magic of SQL.

Introduction

Analytics APIs are used by our customers to quickly pull thousands of pages of time-series data, for custom time-range, aggregated over custom timezones, custom time granularity (hourly/daily) and custom pivots, at low latencies onto their BI tools.

Following are instances of such API calls. The first call pulls Agent Analytics data, as daily aggregated data points, for a time period measured in a custom timezone (Asia/Kolkata), with 1000 records per page (Agent here refers to a customer support personnel)

The second call is similar, except that it fetches aggregate metrics grouped by custom pivots i.e app (customer’s application id), platform_type (platforms like iOS, Android), in addition to hours.

The API response is a JSON dataset containing a set of analytics records and optionally a next page token called next_key. The next page token is used to progress the API call down to subsequent pages until all the dataset for the time range is pulled. Following is a representation of the API response.

Figure 1 : A representation of the Analytics API’s paginated response

Design Challenges

In a customer support context an OLTP or online query would typically be something like updating a support ticket with a message when a support agent replies to it or fetching all the messages on a ticket. Such queries are light-weight and expected to complete under sub-second latencies, whereas an OLAP or analytical query would be something like getting the average time to first response on a ticket for each support agent for the month of Feb, and can typically take minutes to complete as an offline MR job.

Running OLAP like aggregation queries on massive data sets while meeting the aforementioned constraints of doing aggregations on custom timezones, custom time granularity (hourly/daily), custom pivots at low OLTP latencies posed some design challenges as described below:

  1. Large Aggregations on the fly: Since these APIs are to support custom timezones, pre-aggregating metrics data over common time granularities like hourly/daily/weekly wouldn’t have helped.
    For custom timezones to work pre-aggregation had to be done on 30 minutes time samples*. These pre-aggregated records are periodically computed and stored in our distributed datastore. So for each API call, large aggregations had to be done on the fly, based on the supplied timezone, time range and time granularity and optionally custom pivots as mentioned above.
  2. Deep paginations: About 10 Million new records are added to the datastore everyday, leading to 3 Billion records for one year worth of data. The number of records queried for certain API calls can be as high as 15 Million, and the number of columns aggregated can be as high as half a Billion. To pull all of this data, the paginated API can span across 4000 pages with page size being 1000 rows.
  3. OLTP latencies: Since the APIs are supposed to be synchronous data pull REST endpoints, the latency was expected to be not more than a few seconds.

*(We do not support +- 15 mins timezones like Asia/Kathmandu)

Using HBase at Helpshift

Apache HBase is an open source distributed, non-relational database modelled on Google’s Big Table. It runs on top of Hadoop’s distributed filesystem called HDFS. Our existing Analytics dashboard was built on top of HBase for the following reasons:

  1. Mature database to handle big analytics tables: Low latency big queries on tables with billions of rows
  2. Data consistency with fault tolerance: Avoid subpar customer experience because of downtimes, service degradations and data inconsistencies
  3. Absorbs heavy burst workloads: Large range scans (50–60K rows scanned/sec) owing to big OLAP like queries and high bursty write traffic (15K rows written/sec) on account of periodic batch jobs

However, there were some problems with using pure HBase. HBase Java client APIs use a native Thrift protocol, which is too low level to easily implement row scans, server side aggregations, predicate push downs (server side filters), parallel queries etc. All of which was extremely important to build a low latency BigQuery like solution we described earlier.

In summary the native Java API for HBase was too cryptic and verbose for writing even simple data retrieval queries and involved writing complex custom Scanners, Filters, Coprocessors, Serializers/Deserializers in Java.

Enter Apache Phoenix

1st rule of distributed database. If it’s not SQL, it’s not a database
- Jonathan Ellis

Apache Phoenix brings the power of standard SQL and JDBC APIs into HBase. It is different from the SQL interfaces on top of Hadoop or Spark like Hive, Impala etc as it is designed for low latency OLTP workloads and not for long running OLAP workloads. Systems like Spark or Hive implements the SQL query abstractions on big data using offline distributed MR jobs, and cannot be used by an online system like synchronous HTTP APIs. Whereas Phoenix implements its own SQL query engine on top of HBase’s OLTP APIs. A higher level language like SQL makes it easy to develop HBase applications by reducing the amount of code and thereby making developer life easy

Figure 2 : SQL makes everything easy

In summary Phoenix takes SQL query, compiles it into a set of HBase scans, predicate pushdowns and server side aggregations using coprocessor triggers, and orchestrates the running of these operations to produce regular JDBC result sets. It also has support to run custom code on HBase using User defined functions (UDFs), Secondary indexes for non-primary key queries and Cost based query optimisers embedded in the query engine.

NewSQL

Projects like Apache Phoenix and others like Google Spanner marked the rise of NewSQL i.e new scalable databases that fully embraced SQL. Following are excerpts from Google Spanner paper emphasising the benefits of SQL.

“While these systems provided some of the benefits of a database system, they lacked many traditional database features that application developers often rely on. A key example is a robust query language, meaning that developers had to write complex code to process and aggregate the data in their applications. As a result, we decided to turn Spanner into a full featured SQL system, with query execution tightly integrated with the other architectural features of Spanner (such as strong consistency and global replication).”

“The original API of Spanner provided NoSQL methods for point lookups and range scans of individual and interleaved tables. While NoSQL methods provided a simple path to launching Spanner, and continue to be useful in simple retrieval scenarios, SQL has provided significant additional value in expressing more complex data access patterns and pushing computation to the data.”

HBase + Phoenix Architecture

Our existing HBase cluster comprised of 10 region servers. A region server in HBase is a node hosting and serving a bunch of shards (called regions) of the sharded database tables. Region servers were deployed on memory optimised instances, using typical inexpensive commodity like rotational disks, as majority of our workload involved sequential reads and writes.

Adding Apache Phoenix to this architecture (Figure 3) is as simple as deploying a library (JAR) on each region server’s class-path followed by setting up a small cluster of Phoenix Query Server (PQS) nodes . This PQS cluster is an implementation of Apache Calcite JDBC servers that are responsible for processing JDBC client requests over HTTP.

Figure 3 : Representation of the HBase + Phoenix Architecture

Now back to the design challenges

Equipped with the capabilities of Phoenix and backed by a solid datastore that is HBase, we solved the design challenges described earlier as follows:

Large Aggregations on the fly

As discussed before, our half-hourly batch job crunches raw events from Kafka stream to generate metric records. They pre-aggregated on 30 mins time samples and then written to Phoenix relational tables.

Figure 4 is a representation of one such table used by the Analytics API call to pull customer support agent’s daily performance analytics as mentioned before.

Figure 4 : Representation of the relational table that models the metrics data for Agent Analytics API

Figure 5 represents the output dataset we expect the query for the following API call to return. The dataset contains time series metrics for each day in Asia/Kolkata timezone for each of the agents.

https://../analytics/agent/?from=2018–10–02T00:00:00&to=2018-10-09T00:00:00&granularity=daily&timezone=Asia/Kolkata&limit=1000

Figure 5 : Representation of the output aggregate data required in the API response

Figure 6 shows the SQL query that does this aggregation on the fly.
Under the hood the SQL query is translated to HBase native API calls that are sent to multiple HBase region servers, that parallelly scans and filters rows for the customer named “hscustomer” between the “specified time range” while simultaneously running server side aggregations like “SUM”, “COUNT” on the metric columns grouped by the pivot columns i.e “time” and “agentId”. If you notice the group-by pivot “time”, is the 30 mins “time_bucket” column in UTC timezone, converted to “Asia/Kolkata” and truncated to day.

Figure 6 : SQL query for the Agent Analytics API

Deep paginations

Now coming to the problem of paginating API calls on massive datasets comprising of millions of rows. For pagination these APIs use a standard combination of pageSize(limit) and nextPageToken(next_key) approach.

Server side pagination with SQL
For paginating on SQL results a hybrid approach to pagination is used comprising of the following

  1. Row Value Constructors (RVC): RVC is an SQL 92 syntax for running range queries on tables using a combination of primary key columns. The query can specify the start row and end row using the PK column combination. This allows us to use the 1000 rows for the current page and the 1001st row can be deconstructed to prepare the next_key for the next page query’s start row and so on.

2. LIMIT N OFFSET M: Limit offset is again a standard in SQL to perform paginated queries where offset is used to skip M rows from start and limit is used to accumulate N rows for the page.

RVC works well for API queries that do not have any aggregations, however it cannot be used for API queries that need to run aggregation functions like SUM, COUNT as seen before. For such APIs we use the LIMIT N OFFSET M approach. It works well for pagination going few hundred pages deep but starts degrading quickly for Deep paginations scenarios which we solve using a Custom Cursor based approach about which we discuss next.

So in summary a hybrid approach to pagination works as follows

  • RVC for queries that have flat scans or no aggregations
  • LIMIT N OFFSET N for aggregate queries but shallow pagination (usually depends upon the input time range of the API calls)
  • Custom Cursors for deep pagination described next
Figure 7 : Query for the 2nd page using LIMIT OFFSET

3. Custom Cursors for Deep Pagination: As discussed before, LIMIT OFFSET works well for paginations that are relatively shallow, however for scenarios where the data pull scale encompasses retrieval of millions of aggregated rows, which even with page size of 1000, spans across thousands of pages, LIMIT OFFSET does not work.

This does not work because the offset operation in data-stores like HBase has to start scanning rows from the beginning of the scan range and keep skipping them until the offset is reached. All of this unnecessary scanning and skipping leads to wasted CPU cycles, wasted IO resulting in slow queries which keeps getting slower as we go deep into the pages. It also puts pressure on HBase’s region servers heap due to unwanted cache evictions and object churns.

This challenge is solved using a custom cursor based approach where the state of a paginated query life cycle is stored in HBase, while the pagination progresses and until the pagination completes. This for the Analytics API queries, was done by dividing the query space into partitions and running LIMIT OFFSET queries only within the bounds of one partition.

For example in the “Agents Analytics API” we use “agentId” for partitioning the query space by agents. In order to do this, at the commencement of the paginated API call, we start a cursor for the API request by pulling all the “agentId”s of the customer_name and storing it in a Phoenix HBase table as cursor state.

Now we run the LIMIT OFFSET query one “agentId” at a time, until all the agents are queried, following which we close the cursor state. As “agentId” is a part of the table’s composite primary key, Phoenix executes a special filter called SKIP SCAN FILTER when“agentId” is used in the WHERE clause of the query. The HBase scanner uses this hint and does a seek to next row that satisfies the filter condition of the WHERE clause, thereby skips scanning the huge number of rows and columns in between. The cursor state is propagated between the pages of the API calls by encoding it in the next_key token (Figure 8,9,10,11)

Figure 8: Cursor start initialises cursor state with all agentIds and first agentId encoded in next_key is returned
Figure 9: Cursor next progresses cursor state to next agentId
Figure 10: Cursor end clears the state once all agentIds are pulled by the paginate API calls
Figure 11 : Decoded next_key token containing cursor state is used in the Phoenix SQL query, since agent_id is part of the primary key, this query uses a filter called SKIP SCAN FILTER which makes the query run fast

OLTP latencies

As discussed before, running aggregated SQL queries on such massive analytics data sets of tens of millions of rows at low latency of few seconds is a serious challenge. All of these massive row scans, filters and aggregations demands a lot of CPU and IO capacity which is limited on a single machine.

This is where a database like Phoenix really helps as it can, by design, orchestrate the running of these CPU and IO consuming operations parallelly on multiple region servers (assuming the large query space is distributed on multiple machines), running the filters and aggregations on server side close to the data, instead of the client side, thus reducing the amount of data exchanged between the client and server dramatically. In addition, Phoenix parallelizes the running of the GROUP BY on the client, by chunking up the scans based on ranges of the row key. By doing the work in parallel, Phoenix gets us the results extremely fast.

In addition to the Parallel Scans and Parallel Pushdowns (Aggregations and Filters), for paginated queries Phoenix also uses the TopN algorithm where it continues the iterations till each region servers have accumulated N results and finally the Phoenix query server does a merge sort on all the results to finally return the Top N results to the client.

Figure 12 demonstrates this where our Analytics API service (named Sunbird) issues an SQL query for fetching the results of a page of the API call. This query is compiled by the Phoenix query server (PQS) and translated to HBase native RPC calls. The actual query space for the time range can be some millions of rows distributed across multiple regions servers. Phoenix issues Parallel Scans to all relevant region servers based of row key ranges. In addition to the parallel range scan, Phoenix also performs a Skip Scan for the supplied “agent_id” (ID of a customer support agent) that seeks the next row with this agent id inside the key skipping all rows not having this agent id inside the key. The server side coprocessors run the supplied functions to perform the transformations, truncations and aggregations until we are left with few thousand aggregated rows. Phoenix query server finally does a merged sort and returns the page of top 1000 records back to the JDBC client.

Figure 12: API parallel query execution lifecycle on Phoenix HBase architecture

In our next post (part 2 of this series) we will look into Phorm or Phoenix ORM (A custom ORM library written in Java for Phoenix JDBC) and Sunbird (HTTP API server written in Clojure) and describe the efforts and challenges that went into designing a custom ORM library and an API server that serves massive analytics datasets.

Conclusion

The learning that we had from this journey was, for designing a data-intensive application, one of the most critical decision to make is picking the right database system and programming paradigm. A Phoenix HBase architecture and a programming model primarily based on SQL served us well. Also, it is very important to design systems for scale from the outset rather than as an afterthought.

We plan on writing more posts on our experience with using Phoenix and HBase on production, lessons learnt and setting up HA and load-balancing on the PQS cluster in future.

--

--