Time Series Database Design

So, I’ve been thinking a lot about databases, and time series stores recently. I’ve been a long-time user of systems like Ganglia and Collectd. These systems are all great, but unfortunately they all tend to use an RRD or Whisper under the hood. These systems are hard to shard, and distribute, which in turn makes them difficult to scale for speed, and reliability. These systems also sample, and are highly-inflexible, making production debugging difficult.

Long-term Design

At the moment, systems and application (business) metrics are handled by independent models. One thing I find interesting is that nearly every organization’s #bigdata system doesn’t ingest systems metrics into it, in order to analyze, and graph them. There seems to be a divergence of tools, and systems for little good reason. I feel that at some point, there will be a natural adoption of traditional big data analytics systems in systems monitoring. The flexibility of current systems monitoring tools is highly limited. Additionally to this, currently systems monitoring tools are designed for analysis of high-level sampled data, which makes ad-hoc analysis, and debugging difficult.

I believe the answer into this is shifting systems monitoring to the direction of traditional OLAP practices. We need to stop thinking of metrics as individual key value pairs. We should shift to the same model that OLAP does, and use their tools. I want to shift to thinking about data points as OLAP hypercubes, where to dimensions are server-id, datacenter ID, and some relevant other relevant data. I feel like this will enable us to look at the right data, and enable a hybrid approach in which ad-hoc data analysis, and prematerialized views. There are promising candidates for high-speed, ad-hoc analysis of data, like Facebook’s Scuba. Other methods might be valuable to apply to ad-hoc data analysis, like trading off accuracy and query time, a la BlinkDB.

Aspects of monitoring

If I were to design a monitoring system, this is probably what I would care about.

  1. Degrade gracefully: I would rather a system be unable to update graphs, and show historical data than be completely unavailable.
  2. Horizontally scalable: I would rather add more nodes to a cluster for availability, the amount of data being graphed, and the number of data points per second being ingested.
  3. Write availability: I want a system that has predictable write-latency, and optimizes for ingesting data points. I never want to be in the position where my monitoring system hurts the availability of production.
  4. Ad-hoc analysis: I want to be able to dive into specific points of time, and analyze data, without a loss of accuracy. I want this to be reasonably fast given a trade-off of accuracy, and
  5. Complex queries: I want to be able to run complex queries, like finding the estimation of cardinality of over a specific dimension of cubes (HLL) in an ad-hoc fashion.
  6. Materialized views: I want to have views that are incrementally calculated, with complex queries that couldn’t easily be run in real time across all of the data. This includes doing roll-ups of the data.

One of the prime aspects of a monitoring system is that it must be available. A system without any metrics, or monitoring is a system you cannot measure. Without measurement a system may as well be unavailable.

How do we do this?

Dynamo

So, in order to keep available, we need a system that has prioritizes write-availability over read-availability, naturally lends itself to horizontal scalability. In order to maintain these guarantees, an AP system is preferential. An excellent framework, and paper of reference for building said systems is Dynamo. It organizes systems into rings, and then consistently hashes objects before placing it on a set of up nodes, as it walks the ring. This enables the system to be (write) highly available, even in the context of a majority of nodes being down.

Denormalization

In order for the system to work, we must denormalize the hypercube by taking the power set of the queryable dimensions, therefore we must ensure the cardinality of an individual hypercube is relatively low, otherwise it may consume the cluster’s storage, and network capacity rapidly. So, for a given example hypercube:

original_cube: {timestamp: 154, metric: cpu_idle, server_id: 1, datacenter” 43, collector: ganglia, value: 0}

The Ring

We would primarily be interested in rolling up, and querying by server_id, and datacenter. We then take the powerset along these dimensions in order to generate the following sets: [{“datacenter”: 43, “server_id”: 1}, {“server_id”: 1}, {“datacenter”: 43}, {}]

For each of the members of the power set, we write the key: {metric_name, timestamp, dimensions}, with a copy of the original cube. One of the nice properties of time-series data is that since it’s typically inserted in a k-sorted manner, it naturally lends itself to databases like LevelDB, because of the nature of compactions sorting keys, and requiring levels to be in-order. In order to query the data in a reasonable way, with data-locality, we need to hash the data based on the {metric_name, dimensions}. This enables us to build roll-ups, and run queries node-local, and ensures that all metrics with a particular dimension vector live on the same set of nodes.

Writes for given sets of dimensions, given n=2

This enables us to deterministically distribute queries to specific nodes, given knowledge of specific dimension vectors of interest. Knowledge of the DVs themselves is left as an exercise to the system builder, but ideally those are stored in some location that a query-builder can access, whether that be a gossip protocol, or a specific set on a set of vnodes.

Incremental calculation, and materialized views

Unfortunately, a Dynamo system itself isn’t ideal for storing, and manipulating roll-ups, given that roll-ups typically are based around incremental co-recursive reduction functions. Incrementally updating keys, while keeping track of inserted values to avoid duplicating inserts is very difficult. Lastly, updates can have side-effect fanout across multiple keys, and the network chattiness, and overhead can be problematic.

The first step of the solution relies on an ETL-like workflow. On-insert, when the vnode gets it, it can write it to an external store that’s well suited for roll-ups. This store can ensure that the data is inserted iff the key doesn’t already exist in the store, by inserting the cube, and a copy of the key atomically.

Additionally, for the purpose of repair, and incremental calculation, an anti-entropy system must be used as well. The node must insert the key into a Merkle tree as well. This enables incremental recalculation of inserts into the sink datastore during failures, and allows the system to easily recover from failure.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store