This post describes how we improved the query performance for our OpenTSDB cluster and enabled queries that previously were impossible by reducing the resolution of historic data.
Skyscanner’s focus is to drive every decision in Skyscanner by complete, timely and accurate data. As part of this, we’re operating a large metrics and logging platform that enables all engineers in Skyscanner to monitor their service 24 hours a day. We provide application logs and any metrics that our engineers would like to record; for instance, business and operational metrics for their services. We store and serve the data that powers Grafana dashboards and our alerting infrastructure which comprises Bosun and VictorOps. This means that we’re dealing with a tremendous number of small data points which not only need to be ingested with minimal delay, but also need to be returned again efficiently.
Skyscanner’s metrics platform is part of the wider data platform and crucial to the monitoring of services and the business. We’re storing all our business and operational metrics as time series data in OpenTSDB, a well-established project in the open source world. Using Hadoop and HBase as its backend store, OpenTSDB is a Java application providing ingestion and query functionality for time series data. It allowed Skyscanner’s data platform engineers to 10x the ingestion and query volumes multiple times over the past years.
Load tests have shown that we can easily ingest more than 1.5 million data points per second, 50% more than our normal daily volume. This includes operational metrics collected by various agents installed on machines, custom metrics emitted by services, metrics as products from stream processing jobs and more.
We currently retain raw data points for 6 months and make all of them available through OpenTSDB.
All metrics are published to a topic in our company-wide multi-tenant Kafka cluster. The ingestion nodes consume the topic using the handy Kafka RPC plugin provided by the authors of OpenTSDB and insert the data into HBase.
Skyscanner’s OpenTSDB infrastructure runs entirely in AWS. We’re using Cloudera to create the backend clusters, which comprise Zookeeper, HDFS, and HBase. The OpenTSDB nodes themselves are behind a load balancer and split into query and ingestion nodes (read-only and write-only, respectively).
We additionally have Cloudera create a YARN cluster that we use to run MapReduce jobs which upload the daily snapshots of our tables to S3. We’re using these snapshots as backups in case something goes wrong.
For redundancy, we have the entire OpenTSDB infrastructure, as indicated by the green rectangle in the diagram, in two different AWS regions. Both of these setups are completely independent from one another; the only thing they share is the Kafka topic they’re both independently consuming. Since they’re reading the same messages from Kafka, both clusters will eventually have the same data points. Only one of them is serving queries at any time while the other one acts as a standby cluster that we can quickly switch query traffic to when we need to do maintenance work on the currently active cluster.
While this setup works well for resiliency, it complicates the generation of rolled-up data slightly due to the optimisation OpenTSDB’s data model uses. We’ll see details later.
That’s a lot of data
At the moment, we’re adding more than 2 terabytes’ worth of data into our OpenTSDB cluster every day. Ingestion of these volumes often implies a write-optimised setup and ours is no different. However, the query side of things is just as important. We don’t want our Grafana dashboards to show those little red error triangles or on-call engineers across the business incorrectly getting paged at night because our automated alerting solution couldn’t query OpenTSDB.
In the past, we’ve had problems with noisy neighbours. Queries which are expensive to run have repeatedly caused service instability in the past. The main problems were not in OpenTSDB itself, but in HBase’s way of handling RPC requests. Details can be found here, but a gross oversimplification is that all RPCs to HBase end up in the same queues with the same priority; if HBase needs to scan a lot of rows, RPCs remain in the queue for longer — and at some point the queues are full. When congested like this, HBase rejects subsequent requests and OpenTSDB returns an error to the client. So much so, that we introduced a query proxy that estimates the cost of incoming queries based on the cardinality of the metric queried and the time range. If a query is deemed too expensive, we’re not allowing it to protect the service.
We’re also seeing frequent timeouts on queries that ask for a long range of data. Some metric paths can be queried over months, while 7 days’ worth of data can be too much for some others with higher volume, so that they cannot complete before hitting the 10 second timeout.
This situation isn’t ideal and it lead to the blocking of legitimate user queries which were more expensive to execute. While we have seen that more than 86% of user queries can be answered with data from just the last 24 hours and more than 98% with data from the last week, there is a general interest to see longer-term trends which often would get blocked by the query proxy. Engineers had written their own scripts to extract data from OpenTSDB to a separate store to circumvent the limitations we impose.
The figures above also suggest that keeping raw data for as long as we currently do, 6 months, isn’t really worth the extra effort required. Users already need to use an aggregator for the time-axis, a downsampler, in their queries because the number of raw data points returned for queries over a couple of days or weeks would quickly exceed the horizontal pixels available on the screen.
What do others do?
Looking at what other time series databases do to deal with these volumes of data and incoming queries, we’re seeing the same patterns throughout. For instance, Prometheus addresses the problem of scale by sharding. Each instance is only responsible for a very small metric space so that the overall complexity remains relatively low. When your metric volume grows too much, you scale out and end up with multiple federated Promethei which each scrape their own disjoint sets of services. Prometheus is only able to efficiently retain and serve a couple of days’ worth of metrics. To overcome its limitations, Thanos can scrape multiple Promethei, downsample its data and store them in an object store like Amazon’s S3. Queries are then sent to Thanos, but every Prometheus instance still needs a Thanos sidecar that scrapes it.
Cloudera’s popular cluster management software only retains raw metrics for a couple of hours before downsampling them into lower-resolution points and dropping the raw data.
OpenTSDB has support for rollups (and pre-aggregates)
We concluded from our investigations that we’re likely to be able to significantly reduce the cost of serving queries over an extended range of time by pre-downsampling older data points. OpenTSDB introduced this functionality with its newest release, 2.4.0. It brings support for storing and querying data in multiple different resolutions. The aggregation along the x-axis, time, is of particular interest for our problem that queries over a long time range frequently time out. This is covered by the new rollups feature. Pre-aggregates, described in the same document, would help to reduce the cardinality of a metric path.
The main problem is that OpenTSDB doesn’t generate rollups for us; this has to be done separately. We’re looking to roll up raw data points into hourly points, so that all queries that use a downsampling interval of one hour (or integer multiples thereof, such as 2 hours or one day) would automatically be served from rolled-up data points. This means, in the example of a metric that’s emitted once every minute that needs to be downsampled to one-day points, every single data point would only need to aggregate the 24 hourly points, rather than the 86400 raw points that we get when we write a point every minute. This gives a huge performance boost on the query side.
Stream or batch
There are two main types of approaches that can be considered when trying to aggreagate a large amount of data that’s constantly growing: Stream and batch processing. The OpenTSDB documentation lines out the main differences, advantages and drawbacks between a batch and a streaming approach for this particular use case. A stream processing job seems to be the intuitive solution, especially with our chosen ingestion method through Kafka. It would be easy to have a job consume the input topic and do some maths on the data. Chris Larsen also hinted in a blog post for Yahoo! at a potential stream processing solution with Apache Storm.
Our solution refrains from using a stream processing job for a number of reasons. Since we’re looking to generate hourly data points, our stream processing jobs would need to hold the state of an entire hour for all time series that it encounters somewhere. We’re running other stream processing jobs consuming much lower-throughput topics, holding the state in RocksDB. We’re not confident that this would scale to a topic with a significantly higher throughput, like our OpenTSDB topic; especially if we want to be ready for the next 10x.
Our ingestion volume of 1.5 million raw data points per second also requires the topic to be very wide. This means that for a stream processing job with multiple containers, we’d need a repartitioning job first to make sure that a single container of the stream processing job sees all of the data points for a given time series to generate accurate results.
The main problem, however, was late arriving data. Every data point we ingest into OpenTSDB comes with a timestamp. This timestamp isn’t the same as when it is consumed from the Kafka topic. They may vary by a fraction of a second, or by several minutes or hours, depending on the health of the entire platform and the stream processing job. A stream processor would have a hard time figuring out when to make the cut-off for every hour and emit the final rolled-up data point.
Overall, we weren’t confident enough that a stream processing solution would give us accurate data while being low in maintenance at the same time. For these reasons, we opted for a batch processing job that can churn through data at rest and then produce all the rolled-up points after we’re expecting no more writes to a given period of time.
To recap our infrastructure: We’re running a daily backup job every night that snapshots the current tables in our HBase cluster and uploads them to S3, making use of HBase’s ExportSnapshot tool. The data is at rest and we get an updated view of the entire database every 24 hours. For us, this is an ideal time to start a daily batch job and roll up the data from the previous day.
HBase is backed by HDFS, so there are plenty of open source tools for the Hadoop ecosystem. MapReduce jobs are the obvious choice there and AWS provides a service called Elastic Map Reduce (EMR) which makes it easy to spin up a cluster and run an Apache Spark batch job on it. Even more crucially, EMR clusters can be backed by EMRFS, an implementation of HDFS backed by S3. This is ideal to access the backup that we already have in S3. All we need to do is trigger a batch job every time a backup completes. AWS’ serverless Lambda functions can respond to the upload of a file to S3, so they’re a natural fit to kick off our job. That job then aggregates the data and emits the rolled-up data points back to Kafka where they are consumed again by an ingestion node, just like raw data points.
The following diagram shows the new architecture.
The idea for a batch job itself is fairly simple.
- Filter out all data with a timestamp that’s irrelevant for the period of time we want to roll up.
- Look at all time series, that’s every unique combination of metric name and tag keys/value pairs, and get all the raw data points for one hour.
- For that hour, calculate the sum, the minimum and the maximum of the points and also count them. Emit these four new data points with a new timestamp that is aligned to the full hour.
Skyscanner uses Spark for most of their batch processing jobs, so this was our first choice. The fact that we could write the job in Java and run it in the Hadoop ecosystem greatly helped since it’s the home of the remainder of our metrics platform already. The following sections only describe the job we ended up with, not how we got there. Many of us being new to Spark, we didn’t find the best approach straight away. There’s a “what didn’t work” section further down that describes the learnings and failed approaches from early iterations of this job.
Leverage OpenTSDB’s data model
HBase is a columnar database. Almost all time series databases use a columnar store of some sort. OpenTSDB’s data model is designed brilliantly, makes excellent use of HBase’s properties (think of the tables as a huge, distributed, multidimensional, sorted maps) and comes in handy when trying to batch-process the data. To summarise, OpenTSDB creates a new row in HBase for every time series for every hour of raw data. Every row is identified by a unique key which contains an identifier for the time series (TSUID) and the timestamp, normalised to the full hour. Inside each row, it stores each point for that time series with the offset from the hour in the key and its current value.
Oversimplified, a JSON representation of the data model might look like this:
This is ideal for us and means we can process each row (key) individually because by design every key contains all the information we need to generate the new hourly rollup points for one time series and hour. This also makes sure we can take full advantage of Spark’s parallelisation because all rows can be worked on independently and simultaneously.
How can we run this?
The first thing that needs to happen when we complete a new daily backup is spinning up an EMR cluster that has access to the new snapshot in S3 and can run our job. AWS provides the Data Pipeline service to set up complex pipelines that can do more sophisticated things than we need, but it provides us with an easy way of bootstrapping and configuring our cluster, parametrising our job, running it, and tearing everything down again as soon as it’s done.
In the above diagram, the completion of a backup (upload of snapshots of all tables to an S3 bucket) triggers a Lambda function which activates the data pipeline. The pipeline creates the EMR cluster and brings up an EMRFS-backed HBase cluster, into which we’re restoring the freshly created snapshot of the UID table. This enables all Spark worker nodes to efficiently resolve UIDs to their string values. The snapshot of the raw table remains in the same S3 bucket and constitutes the input to the Spark job. Workers can read the HFiles directly from S3.
We’re using this CloudFormation script to create the data pipeline and all its dependencies.
Let’s finally see code
We published the code of the Spark job and the CloudFormation template here: https://github.com/Skyscanner/OpenTSDB-rollup
The main class is the
RollupJob with its
run() method being the entrypoint. From there, it uses the HBase API to read in the raw data table,
tsdb, and wraps it in an RDD object. Resilient distributed datasets are Spark's way of handling massive sets of data across many nodes. The entire raw table is larger than 2TB in size, LZO-compressed. Spark makes sure that every partition of that data is eventually assigned to one of the Spark worker nodes which can then run the job on that small part of the data. Every worker processes every row in the partition. It extracts all the values from the row with the help of a
DoubleSummaryStatistics and emits four new data points: The sum, the minimum, and the maximum of all the points and the total number of points aggregated from that row. These new points are then sent to Kafka with the timestamp of the original row, where the points will be ingested again by a consumer.
Note that this job works directly on the HFiles at rest and doesn’t restore the snapshots into a full HBase cluster; see below for details.
The UID problem
When designing the solution, we ran into a problem that is unique to our setup. As described earlier, we’re running two separate OpenTSDB clusters with separate backends which both ingest from the same Kafka cluster. OpenTSDB’s row keys do contain the metric name and all tags, but they don’t contain the raw string value since that would be repetitive and wasteful. Instead, it only stores a UID instead of every string, typically 3–4 bytes in size, which represent the strings. Additional mappings (remember, HBase is a map, it’s good at looking up things) is stored in a separate UID table to efficiently find a string for a given UID and vice versa. Whenever a data point is written and OpenTSDB looks up all the strings and assigns a new UID for all previously unseen strings and uses that UID to store the data. That’s a compact, economical and efficient way of storing massive amounts of data.
The problem with our setup is that we’re running two completely separate clusters with their own HBase backends. Completely separate? Almost. They’re both reading from the same Kafka cluster, because operating multiple replicated large multi-tenant Kafka clusters is hard. And expensive. It’s much easier to have the ingestion nodes in both clusters independently consume the same topic and go their separate merry ways from there. That means, however, we cannot just emit the raw UIDs (the 3-byte-representation) to Kafka for them to ingest again, since the same UID represents a different string in each cluster. We therefore need to look up the original string values of the UIDs we encounter and emit them, so that the ingesting nodes can store them again with the correct UID in their own database.
That’s where the
UidResolver comes into play. As mentioned earlier, OpenTSDB is already routinely doing these lookups in either direction, so it already has the code in place to do that efficiently. Luckily, our job is written in Java so we can make use of OpenTSDB's code by just importing its JAR. We can instantiate a new
TSDB object and reuse its lookup and caching functionality.
We need every Spark worker to have their own local cache, so we’re broadcasting the UID resolver to all workers. The
TSDB field is marked as
transient because it's not serialisable, so that it gets instantiated newly on every worker.
Handling delayed rollups
Rolling up yesterday’s data today works for the setup described above, but there’s one major drawback. The data is only available more than 24 hours late and OpenTSDB still tries to fetch the data from the rollup table. If the query is looking at a period until now, the most recent data will be missing. However, the raw table does have that data available, albeit in a higher resolution. If we knew exactly how long the data in the rollup table can be delayed for at most, we could split the query into two and merge the results. We’d calculate the last timestamp that we’re guaranteed to have rolled-up data for and fetch everything from the start of the query until that timestamp from the rollup table and similarly everything from that timestamp until the end of the query from the raw table. That’s exactly what this PR to OpenTSDB implements.
We need to configure our SLA for the rollups table and OpenTSDB takes care of the splitting and merging for us. That way, we can take the greatest advantage of the performance improvements from using lower-resolution data as well as abstracting the split from our users. They don’t need to change anything in their queries and all their dashboards and alerts can stay the same. They’ll also automatically take advantage of the improved query performance.
Bugs in aggregation logic
We came across another bug in OpenTSDB’s rollups support that was reported on Github where downsample and series had to be the same; otherwise you’d encounter an error. There was a further bug in the aggregation logic that caused OpenTSDB to return incorrect results when aggregating over rolled-up data. We submitted a fix for that which was merged. There has been no subsequent release of OpenTSDB since, however, so you would need to recompile OpenTSDB from the latest master to get this fix.
We didn’t arrive at the solution we went with immediately and there have been some failed attempts to implement the rollup job differently.
We initially thought we had to restore both snapshots, for the raw data table and the UID table, into our HBase cluster on EMR. Using a live HBase cluster as input for the data is supported by Spark, but gave us terrible performance. We later realised that we didn’t need to restore the raw data table since every row can be proessed independently. This means we can use the TableSnapshotInputFormat to read HFiles direcly from S3 and distribute chunks of the HBase regions to the Spark workers. This gave a gigantic performance boost and made the job feasible in the first place. We eventually ended up slightly modifying the
TableSnapshotInputFormat because we wanted to avoid the EMR cluster writing data to the directory of the snapshot (which we also use as a backup) at all costs.
Given our limited experience with Spark and our requirement to resolve UIDs to their string values (see the UID problem described above), we initially tried to join the UIDs required onto the input rows. It seemed to us like the the canonical way to solve this in Spark, but exploded in our faces. We were never patient enough to even try to let it finish the joining stage before it would eventually move on to actually processing the data. The performance was clearly terrible and we spent too much time on trying to make it performant, before we moved on to the caching solution described above. HBase is good at looking up things, so we’re using it for resolving UIDs.
We also briefly thoguht about getting around solving the UID problem by working with raw UIDs; for our setup with an active and a standby cluster, this would have required running one job (and taking one backup) per cluster, doubling the costs for S3 storage and the EMR cluster and would have created another high-throughput Kafka topic. For single cluster setups, however, working with raw UIDs will give an extra performance boost for the job because the lookups (and thus network requests to HBase) can be omitted, the UID table doesn’t need to be restored at all and HBase isn’t even needed in the EMR cluster anymore.
We’re able to run a rollup job for a day’s worth of data (we’re ingesting 1.5 million raw data points per second) in 4 hours. The EMR cluster uses one m5.xlarge node for the HBase master, 10 m5.xlarge nodes for the core and 20 r4.2xlarge nodes for the Spark workers. The Spark job would easily scale much wider and could complete even faster, but since we deliberately throttled its output to the Kafka cluster in order to not hit it too hard.
Queries that are now served from rolled-up data complete significantly faster than before. We’re regularly seeing improvements of 60x compared to the same query being served based on raw data. Queries over a long period of time that previously timed out are now running smoothly.
For the moment, we’re still keeping the 6 months of raw data until we’re happy that rollups are the way to go forward and we’re not missing anything. We can then shed a lot of data from the HBase cluster by setting the TTL on the raw table to something like one month. Then merging adjacent (and now small) regions to reduce the total number will allow us to scale in our HBase cluster significantly, saving the cost for the boxes and the headache of having to operate a massive cluster.
We’d like to thank all engineers in Skyscanner outside our team who helped us design and develop and test this solution. We learnt a lot as we went along and your input and guidance is much appreciated. Special thanks to the team spending most of their days working on this. 🙌 This is your work of merit.
Björn Marschollek is a senior software engineer in Skyscanner’s Data Platform Tribe which operates the real-time metrics and logging platform and enables monitoring, alerting, and insights for all Skyscanner engineers and their services. He is based in Edinburgh.