PinalyticsDB: A Time Series Database on top of Hbase

Pinterest Engineering
Pinterest Engineering Blog
9 min readSep 26, 2019

Parag Kesar & Alejandro Velez
Software Engineers, Data Engineering

PinalyticsDB is Pinterest’s proprietary time series database. At Pinterest, we rely on PinalyticsDB as a backend for storing and visualizing thousands of time series reports such as the sample case below, segmented by country.

PinalyticsDB was built several years ago on top of Hbase. It utilizes a real time map-reduce architecture for aggregations using Hbase coprocessors. However, as Pinterest grew and the number of reports grew, as well as the usage and volume of data within the reports, it exposed several scalability challenges.

Over the past several months, we’ve rebuilt PinalyticsDB to make it a more performant and reliable service. Read on to learn the performance and scalability challenges we faced and how we redesigned the service to build a better PinalyticsDB.

Hbase Region Server Hotspotting

Hotspotting increasingly became an issue with PinalyticsDB region servers as usage of our platform grew within Pinterest. In the previous design, a new Hbase table was created for every report.

Old Schema Design

The previous design for the row-key is:

Previous Row-Key = prefix | date | suffix
prefix = the metric name as a string
date = YYYY-MM-DD format, again as a string
suffix = composed of segment numbers

The row-key is composed of ascii characters. “|” is used as a delimiter.

There are a few problems with this approach:

  • Since a new table is created for every report, some reports are much more popular than others, so the region servers hosting those reports get more traffic.
  • We have thousands of reports, so it wasn’t practical for Hbase admins to monitor the report tables and split the tables based on observed hotspotting.
  • Within a report, some metrics are more popular. Since the metric-name is the first part of the row-key, this results in more hotspotting.
  • More recent data tends to be more frequently accessed, with the date being part of the row-key after the metric, this results in even more hotspotting.

This hotspotting was for reads, however a similar hotspotting was observed for writes as well, with the tables for the heavier reports having higher # of writes and data for a metric was always written to the last date — resulting in hotspotting of writes to the region(s) hosting the last date for every metric.

New Schema Design

We tackled this problem by making improvements to our row-key schema and HBase table design. We created a single table for all reports, with the following row-key design.

New Row-Key = SALT | <report-id> | <metric-id> | <date-time> | <segments>

The row-key is represented by a byte array (byte[]), not as a string.

  • Every section of the key has fixed length. This is for defining a fixed structure for the row-key. This is also to support fuzzy row filters.
  • Because of the fixed structure, we’re doing away with “|” separators — this is again to save space.

Impact on reads and writes

Pinalytics DB V2 Read Architecture

As you can see due to the salting logic, reads are well distributed throughout the cluster. But this schema design results in the writes being well distributed as well.

Improving Coprocessor Performance

We also looked to optimize the performance of our PinalyticsDB coprocessor by modifying the request structure and scanning behavior for the coprocessor. Our optimizations resulted in improvements in Region Server CPU utilization, RPC latency, and JVM blocked threads.

Our original design created an HBase scan for each metric, segment request sent to Pinalytics. PinalyticsDB receives many such requests, which resulted in a very large number of scans. We decreased the number of HBase scans produced by coalescing the aggregation requests associated with the same report and metric into single scans containing all associated FuzzyRowFilters for the segments requested.

When using Pinalytics, it is common for users to make large batches of request all containing few metrics across different segments. An example of this is illustrated in the chart below requesting a sample metric across multiple USA state segments.

This is actually a very common use case. With many users having dashboards with several of these charts.

This use case inspired our “multi-segment optimization” where the coprocessor performs a single scan (per region salt) for all segments in a PinalyticsRequest associated with the same metric.

PinalyticsDB V2 Coprocessor Design

  • The Pinalytics Thrift Server groups all requests it receives from Pinalytics by metric. Then, for each metric, the coprocessors receive a request containing all the FuzzyRowFilters associated with the segment combinations requested for that metric.
  • For each salt in the coprocessor’s region, the coprocessor creates a MUST_PASS_ONE scan containing all FuzzyRowFilters in the aggregation request in a FilterList.
  • The coprocessor then aggregates the results of all scans by date and FuzzyRowFilter and sends that response back to the Thrift Server.

Only a single aggregation request is needed for the same metric, regardless of the number of different segment combinations requested for that metric.

PinalyticsDB V2 Coprocessor Design: For every salt, a single scan is created for all segments corresponding to the same metric.

Our new coprocessor design resulted in significant improvements to Region Server CPU utilization, RPC latency, and JVM threads blocked.

Note: The below graphs were captured some hours after the deployment of our multi-segment optimization, so they do not accurately reflect the current performance of the system. Still, they help capture the impact of our improved coprocessor design.

Improvement in Region Server CPU utilization after new Coprocessor Deployment

Improvement in Region Server JVM threads blocked after new Coprocessor Deployment

Improvement in Region Server RPC latency after new Coprocessor Deployment

Huge Report Metadata and Thrift Server OOM

Our thrift servers also faced frequent OOM crashes, which would be noticed by users in the form of timeouts from the webapp when attempting to load their charts. This is because the jvm for the thrift servers did not have -XX:+ExitOnOutOfMemoryError set, so the thrift server did not exit on OOM and all calls to it timed out. A quick fix was to add this flag, so that the thrift server was auto-restarted in production on OOM.

To debug this issue, a jconsole was pointed to one of the production thrift servers and it was able to capture a thrift server crash. Below are charts for the total heap, the old generation and new generation.

Total Heap Memory for thrift server with 8G memory

Notice the sudden spike from under 4G to 8+G. This results in OOM.

CMS Old Generation for the Pinalytics DB Thrift Server

Again, the old-gen spiked from a very low value to over 4G, exceeding the limit for the old gen. There is no time for the CMS collector to kick in or even a full GC to kick in — the spike is so immediate.

Eden Space for the Pinalytics DB Thrift Server

We were able to recreate this issue in our development environment with load testing and identified the problem as being related to how we were reading and storing Report Metadata. For the majority of our reports, the metadata is just a few KBs. However, for some reports, the metadata is 60+, even 120+ MB. The main reason for this is the large number of metrics a report can have.

Report Metadata Structure

This is the report metadata for a single report. Report metadata is stored in a special secondary index table.

#!/usr/bin/python
# -*- coding: utf-8 -*-
ReportInfo(
tableName=u’growth_ResurrectionSegmentedReport’,
reportName=u’growth_ResurrectionSegmentedReport’,
segInfo={
u’segKey2': u’gender’,
u’segKey1': u’country’,
u’segKeyNum’: u’2',
},
metrics={u’resurrection’: MetricMetadata(name=u’resurrection’,
valueNames=None),
u’5min_resurrection’:MetricMetadata(name=u’5min_resurrection’
, valueNames=None)},
segKeys={
1: {
u’1': u’US’,
u’2': u’UK’,
u’3': u’CA’,

}
)

Optimizing storage & retrieval use of Report Metadata

The report metadata was stored in an hbase secondary index table as a serialized blob. So the root cause of the issue was the huge size of the report metadata and the fact that we were loading the entire metadata and not just the parts we wanted. Under a high load situation, it’s entirely possible for the jvm heap to fill-up so quickly that the jvm is unable to do even a stop the world full GC.

To resolve the root cause of this issue, we decided to distribute the contents of the report metadata into several column families and qualifiers under the row-key for the report.

Enhanced PinalyticsDB V2 Report Row-Key structure, which is distributed among multiple column families and qualifiers

An update of a row-key is atomic. So all column families will be updated atomically with a single PUT operation.

We created a new method to get the report-info.

getReportInfo(final String siTableName, final String reportName, List<String> metrics)

The will return all the seg-info and seg-keys data but only the relevant metrics data from the ‘metrics’ column family. Since most of the size of a report is because of metric data, we’ll just return a few kb of data instead of potentially 100mb+ of data.

Thrift Server Round Robin Pool

There was another change we made to the thrift servers that helped with scalability. Every thrift server has a single instance of a hbase org.apache.hadoop.hbase.client.Connection.

hbase.client.ipc.pool.size=5
hbase.client.ipc.pool.type=RoundRobinPool

The default is 1 connection per region server. This setting increases the number of concurrent connections — which help us scale the number of requests per server.

Drawbacks and Limitations

Though this design is working out well for us, we do recognize that it has several limitations.

While the scale out architecture results in evenly distributed reads and writes, it impacts availability. eg: any region server or any zookeeper issue impacts all read and write requests. We’re setting up a backup cluster with 2-way replication and we will setup automatic failover for reads and writes in case of any issues with the primary cluster.

Since the segments are part of the row-key, a report with many segments will consume more disk space. It is not possible to add or delete segments for a report after creation. Also reports with very high cardinality and a lot of data can be slow, despite FuzzyRowFilter fast forwarding. This could be offset by adding parallelism within coprocessors to execute scans for each salt (or even partition scans by dates) in parallel.

This architecture uses coprocessors for reads and there is no support for replicated reads for coprocessors. We may be able to partially offset the lack of replication support for coprocessors by building a roll-up caching layer where results are stored in a high availability table and we do a replicated read (using a regular hbase scan and not coprocessors) if getting data from the primary region fails.

We plan to tackle some of these limitations in our next iteration. We also plan to add support for Top-N, percentiles, group-by and functions such as min, max.

Acknowledgments: Huge thanks to Rob Claire, Chunyan Wang, Justin Mejorada-Pier and Bryant Xiao who help support PinalyticsDB at Pinterest. Also huge thanks to the Analytics Platform and Storage & Caching teams who help support the Pinalytics webapp and HBase cluster respectively.

We’re building the world’s first visual discovery engine. More than 250 million people around the world use Pinterest to dream about, plan and prepare for things they want to do in life. Come join us!

--

--