GokuL: Extending time series data storage to serve beyond one day

Pinterest Engineering
Pinterest Engineering Blog
7 min readDec 12, 2019

Rui Zhang, Hao Jiang, Monil Mukesh Sanghavi, Jian Guo | Real Time Analytics Team

At Pinterest, developers rely heavily on Statsboard to monitor their systems and get alerted when issues happen. Last year, we introduced our time series database (aka Goku) which stores and serves the most recent one day’s data. Although this covers more than 90% of queries, there are still situations where developers want to query data beyond one day (like week-over-week comparison, yearly capacity planning, or investigating issues from days or weeks ago, etc.)

In response, we built GokuL — a disk based version of Goku for long-term data. GokuL is written in C++ and uses RocksDB as storage engine to support queries beyond one day efficiently.

New Features in GokuL

Data Roll Up

Roll up is the process of transforming multiple data points within a certain time interval into one single data point with one aggregator. For example, say we have 10 data points (0, 1), (60, 2), (120, 3), (180, 4), (240, 5), (300, 6), (360, 7), (420, 8), (480, 9), (540, 10) and we want to roll up with five minutes interval and SUM aggregator. The data points after rollup become (0, 15), (300, 40).

While raw data is the most accurate, its costly in resources and storage and is slow to query. Over a longer timespan, it’s okay to have coarser time granularity of data, but you’ll want to enable faster query experiences with lower cost. We introduced the built-in roll up support in GokuL as our solution. When the data is older, the time granularity is coarser, and so we made the time granularity settings configurable in GokuL. Below are the settings we are using in Pinterest:

We still keep 24 days of raw data if developers prefer raw data in some situations such as week-over-week comparison.

When querying GokuL, clients need to set rollupAggregator and rollupInterval in the query in order to get corresponding rollup data.

Architecture

The overall goku ecosystem looks like the figure below:

Tiered Data

Data in Goku is divided into buckets where one bucket is two hours long. The buckets to read depend on the time range. In GokuL, in addition to data bucketing, we introduce another concept — Tier. In one tier, data is divided into buckets but different tiers might have different bucket sizes. With tiered data, we introduce a new process called compaction which merges multiple buckets from a lower tier into one bucket in a higher tier. Roll up happens in compaction too.

The available tier settings are as below and can all be tuned.

  1. Bucket size in seconds
  2. Roll up interval in seconds
  3. Whether to keep raw data or not in this tier
  4. Number of buckets required to merge into next tier
  5. Number of buckets to keep in this tier

Goku Compactor Service

In order to do compaction, we developed another service called Goku Compactor. For the tier 0’s compaction, the service downloads short term Goku’s data from AWS EFS, merge into tier 1 bucket and upload to S3. For the other tiers’ compaction, the service downloads from S3 and uploads to S3 after compaction. By separating data compaction from online serving, the compactions can fully utilize resources but won’t impact online serving performance.

Compaction uses a lot of CPU and memory. In order to protect Goku Compactor, we use two thread pools. One pool has more threads which is for light weight compactions and another has fewer threads which is for heavy weight compactions.

Because compactions won’t run all the time, we use auto scaling to minimize the cost.

During the compaction process, Goku Compactor generates sst files which will then be consumed by GokuL.

RocksDB Key & Value Design

There are three types of data in the generated sst files: a.Dictionary; b. Time series data; c.Inverted indexes. And we generate these data for every shard and bucket.

For every metric name, tag key and tag value, we will assign a unique 4 bytes id to it. For example, if we have the following time series:

tc.proc.stat.cpu.total{host=host1,host_type=infra-goku-a-prod}

tc.proc.stat.cpu.total{host=host2,host_type=infra-goku-d-prod}

ostrich.metrics.goku.ms_short_query_root.p90{host=host1,host_type=infra-goku-a-prod}

The generated dictionary will be:

tc.proc.stat.cpu.total => 1

ostrich.metrics.goku.ms_short_query_root.p90 => 2

host => 3

host1 => 4

host_type => 5

infra-goku-a-prod => 6

host2 => 7

infra-goku-d-prod => 8

We store word to id mapping and reversely in RocksDB for different purposes.

Just as with Goku, we encode time series data with Gorilla encoding. The keys are in the format of [roll up aggregator type][metric name dictionary id][time series id]. Roll up aggregator type is a 1 byte’s aggregator enum. For raw data, the aggregator is set to None. The time series id is a 4 bytes integer which ranges from 0 to the cardinality of the metric name.

Inverted index keys are in the format of [metric name dictionary id][tag key dictionary id][tag value dictionary id(optional)]. The values are encoded time series ids which are used in data keys with the same metric name dictionary id. We use Group Varint Encoding algorithm mentioned in this keynote. When there are only tag key id, the index key is used to efficiently apply wildcard filters(ex, host=*).

Because our data are tiered and bucketed, when generating these keys, we prepend [magic(1 byte)][tier(1 byte)][bucket(4 bytes)] to every RocksDB key. Magic number is a byte to identify different types of keys.

Data Retention

As mentioned earlier, we set TTL for rollup data, and different TTLs for different tiers.

To achieve this goal, we implemented a RocksDB compaction filter. The compaction filter is used to check whether the bucket in the key is expired based on the number of buckets to keep parameter of every tier. If expired, the key will be removed during the compaction process.

Cluster Management

GokuL uses the same two-layer sharding strategy in Goku to provide horizontal scalability, parallel aggregation and limit query fanout.

Rocksplicator is used to manage shards in the cluster. It reduces our operational load with its automatic shard management and recovery ability. (More details on Rocksplicator can be found in Open-sourcing Rocksplicator, a real-time RocksDB data replicator and Automated cluster management and recovery for Rocksplicator)

Query Process

GokuL uses the same query engine as Goku. However, we reimplement the routing logic in Goku Root(see graph below). Goku Root follows the steps below to process a query.

  1. Break the query into two queries if the time range touches both Goku and GokuL;
  2. Send queries to corresponding Goku and GokuL leaves respectively and do aggregation on leaves;
  3. Aggregate results from leaves and merge into one query result

Internally, we have two Goku clusters and two GokuL clusters. Goku Root will failover to another cluster’s leaf which holds the same shard if one leaf query fails.

Server Protection

Time series queries are sometimes very expensive, and gets even worse when the time range is larger. They can utilize too much memory and cpu resources, starve other queries and crash the process. GokuL employs the following approaches to protect itself.

Pipelined query execution

We implemented pipelined execution which reads, decodes and aggregates data batch by batch. By doing so, it is able to release memory after executing every operator which minimize the risk of OutOfMemory error. Also, it can use CPU more efficiently by parallelizing data IO and computation in the same pipeline.

Early Termination

Even with pipelined execution, some queries can blow up cpu and memory usage. For example, queries that fetch every time series without aggregation. These queries share some common characteristics: high cardinality or heavy interpolation. Currently we will terminate one query as soon as it reaches one of the two thresholds: a. Cardinality limit; b. Memory usage limit.

Performance

We evaluated some common queries with different time ranges on both OpenTSDB and GokuL. Here are some settings during the benchmarking.

Hardware

OpenTSDB + HBase: c5.2xlarge + i3.2xlarge

GokuL: i3.2xlarge

Query

Queries with time range of 2 days, 4 days, 7 days and 14 days.

Results

GokuL is 30x — 100x faster than OpenTSDB depending on the queries.

What’s Next

Pagination support

While our existing server protection approaches work fine in most situations, we lose the ability to handle some high cardinality and expensive queries. Currently we need to ask users to rewrite their query or we have to adjust the threshold.

In order to process these queries and not crash our system, we can do pagination on the time scale. Given a long range query, we can return a sub range’s results and a continuation token to client. Next time, clients provide this continuation token to GokuL to get next sub range’s results.

By doing pagination, it not only mitigates server’s load but also client’s. Because clients won’t need to wait all data ready before drawing the graph. Instead, they can draw graphs part by part.

Query Cache

Executing queries are fast in GokuL, but could be faster if hot query data results were cached. GokuL will partition the cached data based on time like raw data, which can make pagination queries faster as well. The goal is tomake the cache layer abstract enough so that we can easily adopt different caching systems like local memory or memcached.

Acknowledgments: Huge thanks to Brian Overstreet, Wei Zhu, Humsheen Geo, Dai Nguyen and Nomy Abbas from visibility team for helping rolling out GokuL on Statsboard and , Bo Liu, Guodong Han, Sihan Wang from Serving Systems team for helping us understand and adopt Rocksplicator.

We’re building the world’s first visual discovery engine. More than 320 million people around the world use Pinterest to dream about, plan and prepare for things they want to do in life. Come join us!

--

--