Roll up to speed up: Improving OpenTSDB query performance

This post describes how we improved the query performance for our OpenTSDB cluster and enabled queries that previously were impossible by reducing the resolution of historic data.

Skyscanner’s focus is to drive every decision in Skyscanner by complete, timely and accurate data. As part of this, we’re operating a large metrics and logging platform that enables all engineers in Skyscanner to monitor their service 24 hours a day. We provide application logs and any metrics that our engineers would like to record; for instance, business and operational metrics for their services. We store and serve the data that powers Grafana dashboards and our alerting infrastructure which comprises Bosun and VictorOps. This means that we’re dealing with a tremendous number of small data points which not only need to be ingested with minimal delay, but also need to be returned again efficiently.

Background

Load tests have shown that we can easily ingest more than 1.5 million data points per second, 50% more than our normal daily volume. This includes operational metrics collected by various agents installed on machines, custom metrics emitted by services, metrics as products from stream processing jobs and more.

We currently retain raw data points for 6 months and make all of them available through OpenTSDB.

Current infrastructure

Skyscanner’s OpenTSDB infrastructure runs entirely in AWS. We’re using Cloudera to create the backend clusters, which comprise Zookeeper, HDFS, and HBase. The OpenTSDB nodes themselves are behind a load balancer and split into query and ingestion nodes (read-only and write-only, respectively).

We additionally have Cloudera create a YARN cluster that we use to run MapReduce jobs which upload the daily snapshots of our tables to S3. We’re using these snapshots as backups in case something goes wrong.

For redundancy, we have the entire OpenTSDB infrastructure, as indicated by the green rectangle in the diagram, in two different AWS regions. Both of these setups are completely independent from one another; the only thing they share is the Kafka topic they’re both independently consuming. Since they’re reading the same messages from Kafka, both clusters will eventually have the same data points. Only one of them is serving queries at any time while the other one acts as a standby cluster that we can quickly switch query traffic to when we need to do maintenance work on the currently active cluster.

While this setup works well for resiliency, it complicates the generation of rolled-up data slightly due to the optimisation OpenTSDB’s data model uses. We’ll see details later.

That’s a lot of data

In the past, we’ve had problems with noisy neighbours. Queries which are expensive to run have repeatedly caused service instability in the past. The main problems were not in OpenTSDB itself, but in HBase’s way of handling RPC requests. Details can be found here, but a gross oversimplification is that all RPCs to HBase end up in the same queues with the same priority; if HBase needs to scan a lot of rows, RPCs remain in the queue for longer — and at some point the queues are full. When congested like this, HBase rejects subsequent requests and OpenTSDB returns an error to the client. So much so, that we introduced a query proxy that estimates the cost of incoming queries based on the cardinality of the metric queried and the time range. If a query is deemed too expensive, we’re not allowing it to protect the service.

We’re also seeing frequent timeouts on queries that ask for a long range of data. Some metric paths can be queried over months, while 7 days’ worth of data can be too much for some others with higher volume, so that they cannot complete before hitting the 10 second timeout.

This situation isn’t ideal and it lead to the blocking of legitimate user queries which were more expensive to execute. While we have seen that more than 86% of user queries can be answered with data from just the last 24 hours and more than 98% with data from the last week, there is a general interest to see longer-term trends which often would get blocked by the query proxy. Engineers had written their own scripts to extract data from OpenTSDB to a separate store to circumvent the limitations we impose.

The figures above also suggest that keeping raw data for as long as we currently do, 6 months, isn’t really worth the extra effort required. Users already need to use an aggregator for the time-axis, a downsampler, in their queries because the number of raw data points returned for queries over a couple of days or weeks would quickly exceed the horizontal pixels available on the screen.

What do others do?

Cloudera’s popular cluster management software only retains raw metrics for a couple of hours before downsampling them into lower-resolution points and dropping the raw data.

OpenTSDB has support for rollups (and pre-aggregates)

The main problem is that OpenTSDB doesn’t generate rollups for us; this has to be done separately. We’re looking to roll up raw data points into hourly points, so that all queries that use a downsampling interval of one hour (or integer multiples thereof, such as 2 hours or one day) would automatically be served from rolled-up data points. This means, in the example of a metric that’s emitted once every minute that needs to be downsampled to one-day points, every single data point would only need to aggregate the 24 hourly points, rather than the 86400 raw points that we get when we write a point every minute. This gives a huge performance boost on the query side.

Stream or batch

Our solution refrains from using a stream processing job for a number of reasons. Since we’re looking to generate hourly data points, our stream processing jobs would need to hold the state of an entire hour for all time series that it encounters somewhere. We’re running other stream processing jobs consuming much lower-throughput topics, holding the state in RocksDB. We’re not confident that this would scale to a topic with a significantly higher throughput, like our OpenTSDB topic; especially if we want to be ready for the next 10x.

Our ingestion volume of 1.5 million raw data points per second also requires the topic to be very wide. This means that for a stream processing job with multiple containers, we’d need a repartitioning job first to make sure that a single container of the stream processing job sees all of the data points for a given time series to generate accurate results.

The main problem, however, was late arriving data. Every data point we ingest into OpenTSDB comes with a timestamp. This timestamp isn’t the same as when it is consumed from the Kafka topic. They may vary by a fraction of a second, or by several minutes or hours, depending on the health of the entire platform and the stream processing job. A stream processor would have a hard time figuring out when to make the cut-off for every hour and emit the final rolled-up data point.

Overall, we weren’t confident enough that a stream processing solution would give us accurate data while being low in maintenance at the same time. For these reasons, we opted for a batch processing job that can churn through data at rest and then produce all the rolled-up points after we’re expecting no more writes to a given period of time.

New infrastructure

HBase is backed by HDFS, so there are plenty of open source tools for the Hadoop ecosystem. MapReduce jobs are the obvious choice there and AWS provides a service called Elastic Map Reduce (EMR) which makes it easy to spin up a cluster and run an Apache Spark batch job on it. Even more crucially, EMR clusters can be backed by EMRFS, an implementation of HDFS backed by S3. This is ideal to access the backup that we already have in S3. All we need to do is trigger a batch job every time a backup completes. AWS’ serverless Lambda functions can respond to the upload of a file to S3, so they’re a natural fit to kick off our job. That job then aggregates the data and emits the rolled-up data points back to Kafka where they are consumed again by an ingestion node, just like raw data points.

The following diagram shows the new architecture.

The job

  1. Filter out all data with a timestamp that’s irrelevant for the period of time we want to roll up.
  2. Look at all time series, that’s every unique combination of metric name and tag keys/value pairs, and get all the raw data points for one hour.
  3. For that hour, calculate the sum, the minimum and the maximum of the points and also count them. Emit these four new data points with a new timestamp that is aligned to the full hour.

Skyscanner uses Spark for most of their batch processing jobs, so this was our first choice. The fact that we could write the job in Java and run it in the Hadoop ecosystem greatly helped since it’s the home of the remainder of our metrics platform already. The following sections only describe the job we ended up with, not how we got there. Many of us being new to Spark, we didn’t find the best approach straight away. There’s a “what didn’t work” section further down that describes the learnings and failed approaches from early iterations of this job.

Leverage OpenTSDB’s data model

Oversimplified, a JSON representation of the data model might look like this:

This is ideal for us and means we can process each row (key) individually because by design every key contains all the information we need to generate the new hourly rollup points for one time series and hour. This also makes sure we can take full advantage of Spark’s parallelisation because all rows can be worked on independently and simultaneously.

How can we run this?

In the above diagram, the completion of a backup (upload of snapshots of all tables to an S3 bucket) triggers a Lambda function which activates the data pipeline. The pipeline creates the EMR cluster and brings up an EMRFS-backed HBase cluster, into which we’re restoring the freshly created snapshot of the UID table. This enables all Spark worker nodes to efficiently resolve UIDs to their string values. The snapshot of the raw table remains in the same S3 bucket and constitutes the input to the Spark job. Workers can read the HFiles directly from S3.

We’re using this CloudFormation script to create the data pipeline and all its dependencies.

Let’s finally see code

The main class is the RollupJob with its run() method being the entrypoint. From there, it uses the HBase API to read in the raw data table, tsdb, and wraps it in an RDD object. Resilient distributed datasets are Spark's way of handling massive sets of data across many nodes. The entire raw table is larger than 2TB in size, LZO-compressed. Spark makes sure that every partition of that data is eventually assigned to one of the Spark worker nodes which can then run the job on that small part of the data. Every worker processes every row in the partition. It extracts all the values from the row with the help of a DoubleSummaryStatistics and emits four new data points: The sum, the minimum, and the maximum of all the points and the total number of points aggregated from that row. These new points are then sent to Kafka with the timestamp of the original row, where the points will be ingested again by a consumer.

Note that this job works directly on the HFiles at rest and doesn’t restore the snapshots into a full HBase cluster; see below for details.

The UID problem

The problem with our setup is that we’re running two completely separate clusters with their own HBase backends. Completely separate? Almost. They’re both reading from the same Kafka cluster, because operating multiple replicated large multi-tenant Kafka clusters is hard. And expensive. It’s much easier to have the ingestion nodes in both clusters independently consume the same topic and go their separate merry ways from there. That means, however, we cannot just emit the raw UIDs (the 3-byte-representation) to Kafka for them to ingest again, since the same UID represents a different string in each cluster. We therefore need to look up the original string values of the UIDs we encounter and emit them, so that the ingesting nodes can store them again with the correct UID in their own database.

That’s where the UidResolver comes into play. As mentioned earlier, OpenTSDB is already routinely doing these lookups in either direction, so it already has the code in place to do that efficiently. Luckily, our job is written in Java so we can make use of OpenTSDB's code by just importing its JAR. We can instantiate a new TSDB object and reuse its lookup and caching functionality.

We need every Spark worker to have their own local cache, so we’re broadcasting the UID resolver to all workers. The TSDB field is marked as transient because it's not serialisable, so that it gets instantiated newly on every worker.

Handling delayed rollups

We need to configure our SLA for the rollups table and OpenTSDB takes care of the splitting and merging for us. That way, we can take the greatest advantage of the performance improvements from using lower-resolution data as well as abstracting the split from our users. They don’t need to change anything in their queries and all their dashboards and alerts can stay the same. They’ll also automatically take advantage of the improved query performance.

Bugs in aggregation logic

#failing-forward

We initially thought we had to restore both snapshots, for the raw data table and the UID table, into our HBase cluster on EMR. Using a live HBase cluster as input for the data is supported by Spark, but gave us terrible performance. We later realised that we didn’t need to restore the raw data table since every row can be proessed independently. This means we can use the TableSnapshotInputFormat to read HFiles direcly from S3 and distribute chunks of the HBase regions to the Spark workers. This gave a gigantic performance boost and made the job feasible in the first place. We eventually ended up slightly modifying the TableSnapshotInputFormat because we wanted to avoid the EMR cluster writing data to the directory of the snapshot (which we also use as a backup) at all costs.

Given our limited experience with Spark and our requirement to resolve UIDs to their string values (see the UID problem described above), we initially tried to join the UIDs required onto the input rows. It seemed to us like the the canonical way to solve this in Spark, but exploded in our faces. We were never patient enough to even try to let it finish the joining stage before it would eventually move on to actually processing the data. The performance was clearly terrible and we spent too much time on trying to make it performant, before we moved on to the caching solution described above. HBase is good at looking up things, so we’re using it for resolving UIDs.

We also briefly thoguht about getting around solving the UID problem by working with raw UIDs; for our setup with an active and a standby cluster, this would have required running one job (and taking one backup) per cluster, doubling the costs for S3 storage and the EMR cluster and would have created another high-throughput Kafka topic. For single cluster setups, however, working with raw UIDs will give an extra performance boost for the job because the lookups (and thus network requests to HBase) can be omitted, the UID table doesn’t need to be restored at all and HBase isn’t even needed in the EMR cluster anymore.

Results

Queries that are now served from rolled-up data complete significantly faster than before. We’re regularly seeing improvements of 60x compared to the same query being served based on raw data. Queries over a long period of time that previously timed out are now running smoothly.

Going forward

Acknowledgements

Author

We are the engineers at Skyscanner, the company changing how the world travels. Visit skyscanner.net to see how we walk the talk!