How to index hundreds of millions of series in a time-series database

Evgeniy Zuykin
Agoda Engineering & Design
7 min readDec 9, 2022

In WhiteFalcon — Agoda’s in-house time-series database inspired by Apache Druid, we have a RealTime component, which ingests new measurements and stores the last few hours of data in memory. It’s done like that because most requests (alert systems and dashboards) use only the most recent data. Today we will discuss the index structure we use to operate with in-memory data efficiently.

Let’s start with some trivia first. Our measurement has the following format:

{
"metric" : "service.a.request.duration"
"value" : 50,
"timestamp": 1661067352,
"tags": [
"server": "A1",
"cluster": "C1",
"status" : "200",
...
]
}

Each measurement contains a metric name, timestamp, long or double value, and set of tags (Prometheus calls them labels). Each unique combination of metric + tags creates a new series and assigned integer id.

Series into unique identifier

Instead of storing original values, WhiteFalcon aggregates them into data points(sum/count, quantiles, hyperLogLog, etc.), depending on granularity. Hence each series is assigned its own set of data points.

Series Id into aggregated data

WhiteFalcon provides an interface to query data filtering out series including(or excluding) specific tag values, for example:

{
"include_tags": {
"server": [ "A1", "A2", "B.*" ],
"cluster": [ "A", "B" ]
},
"exclude_tags": {
"status": [ "503", "4.*" ]
},
"metric": "service.a.request.duration",
"granularity": 60,
"start": "2022-08-21T08:10:00",
"end": "2022-12-21T09:11:00",
}

This query means the following — give me data for the metric, with a data step of 60 seconds, between specified dates, for only series which have one of the specified values for server AND one of the specified values of cluster and don’t have any of specified values for status. To serve this request, we need first to filter out the series which apply to the request and then reaggregate the data of each series together.

So our index must support the following operations:

  • Find specific series to aggregate new measurements into during the ingestion (or create a new one)
  • Find all series matching the query

Our core index has the following structure:

Core index structure

Each metric has its map of tags to series id. This helps to efficiently search the series to aggregate new measurements using a single kv lookup.

Initially, we had a simple brute-force approach to filtering series for the query: iterate through all series for queried metrics and filter out those applicable to the query. This approach is not ideal since the complexity grows in O(N) manner (where N is a number of series for the metric), which might not work well for metrics with many series.

Index sequence scan

While the idea of using such an approach in production sounds a bit silly, it has its benefits:

  • Simple — since it is a sequence scan
  • Flexible — it is easy to add new features to seq scan
  • Low ingestion performance impact, since it is just one kv lookup (or insert in case of new series)
  • Memory overhead is minimal, again, because it is just one lookup

So how can we organize our data to make our search more efficient?

For each tag value, let’s store a set of series where it was seen.

Series set for each tag value

Now we can quickly find the list of series required by doing some magic on sets of each tag value from the query.
Let’s have a look at the example request:

"include_tags": {
"server": [ "A1", "B1"],
},
"exclude_tags": {
"cluster": [ "B" ]
}

To find the series applicable to the query, we need to get a series set for server values A1, union them with a set for B1, and then remove all series for cluster B.

Union of values for each tag and then intersection

Okay, but this query was simple, and our dataset wasn’t that complicated. Let’s see how our algorithm behaves on something more complex. Let’s throw one more tag into our series.

Adding status field to series sets aggregation

Also, let’s make our query more complicated as well:

"include_tags": {
"server": [ "A.*"],
"status": ["200", "500"]
},
"exclude_tags": {
"server": [ "A2"],
}

Here is how we execute this query step by step:

  • Find all tag values of the tag server which apply to wildcard A.*
  • Get series there these tag values are used and union these sets
  • Find all series where tag status values are used and union them
  • Intersect series set from tag server with series set from tag status
  • Find a series where server A2 is used and remove these values from the set obtained by previous steps.
∪ is a set union and ∩ is set intersection

We need first to find the union of all series sets for a specific tag before we can proceed to the next tag. Otherwise, we will end up having incorrect results.

There is another case we need to consider — previously, we always had a non-empty include_tags section, but if it is empty, we need to return data for all series. For that purpose, we will aggregate a set of all the series used in this metric. Let’s discuss such query:

"include_tags": {},
"exclude_tags": {
"status": [ "400", "500" ],
}
Discard excluded tags from all series

First, we get combined values to exclude tag status by unioning their series sets. Later we remove these values from the total series set.

So, did the time complexity change compared to the brute force approach? For this approach, we don’t iterate over all series for metric for each request, but instead just union and intersect series sets for those tags specified in queries; hence this algorithm should be much faster!

We have a working data structure that would work well for our case, but is it optimal? Let’s have a look at how we store series.

If we store them as simple hash sets, the addition of new values will be fast, but the main operations union, intersection, and removal of others(and not) would be slow because they will have O(N) complexity. Not mentioning we will need to create a fresh instance of such sets on every union/intersection, which will affect performance and impact GC.

Are there any better alternatives to HashSet? Our series are always positive integers, allocated continuously, starting from 0. It means we can use the BitSet data structure.

HashSet vs BitSet structure

BitSet is a collection of 64-bit integers(Longs) slapped on each other, creating a continuous long array. Each Long has 64 bits, meaning it can hold up to 64 boolean values. It means BitSet is a natural replacement for Array<Boolean> and HashMap<Integer>.

The hashset with 64 integers will require 64 * 4 bytes (+ some bytes for internal HashSet buckets), but bitset will need just 8 bytes for long + (several bytes for long array). Also, since it is a sequence of Longs, it’s heavily optimized for bitwise operations like AND, OR, XOR, etc. This is precisely what we need!

Suppose we use bitsets instead of hashsets for storing our series sets. In that case, we will not only improve the memory footprint but also the performance of all required operations — for example, the time complexity of intersection operation will drop from O(N), where N is a number of elements, to O(K), where K is a number of Longs required to store N elements. Sounds great!

But BitSet’s biggest advantage might also become its weakness.

Sparse series ids

Previously we assumed all series ids for the same metric are continuous. However, it is true for all metrics across the index, for specific metrics, these values may be sparse.

In the case of the last series, 100260, it will require bitset to have an internal array of ~ 1567 (100260 / 64) longs, which is 100 KB, just to create a bitset and store only value in it! Since we are talking about hundreds of millions of series, this flaw will hit us even harder.

So what does it mean? Can’t we use bitsets for our purpose? Luckily, people have already figured out the solution to this problem — compressed bitsets. There are multiple implementations; we use roaring bitmap

Roaring bitmaps are compressed bitmaps which tend to outperform conventional compressed bitmaps such as WAH, EWAH or Concise. In some instances, they can be hundreds of times faster and they often offer significantly better compression.

Although our series is sparse, series for the same metric are often being allocated together, meaning they still can primarily benefit from compressed bitsets, so we can safely use RoaringBitmap to store our series sets.

Conclusion

Now about the results of this approach adoption in production:

ms per minute required for series filtering
  1. On average, the time required for series filtering is reduced 4–6 times — from 20 ms per minute to 3–5 ms per second.
  2. Spikes caused by queries for metrics with large amounts of series reduced and the curve flattened.
  3. The memory required for the index increased by 20%, which is not bad, considering the initial index was just a plain hash map.
  4. Ingestion speed dropped by 2–3% due to the accumulation of series sets for each tag.

--

--