CubeDB: open source minimalist counter store with multidimensional keys
Hi Medium! My name is Demeter Stanko and I work on the Badoo BI team at the London office. The way things work at our company is that we try to measure user activity in as many ways as we can. There are lots of specialists for whom this is essential: developers testing that code works, colleagues on the product teams confirming how ingenious their ideas are, admins confirming that, at least tonight, entropy is not going to triumph, and colleagues from the anti-spam department confirming that, in the eternal and epic struggle, good shall overcome evil.
Mobile analytics
Several years ago, Badoo needed to analyse the actions of users in a mobile app. Each time someone pressed a button, loaded a screen, opened an app or wrote a message, we received event notification in our system and this brought a little bit of joy to the hearts of our product managers. This was happening a couple of hundred million times per day.
Soon, the source of joy was found in more specific cases, such as if a given user scrolled down to the bottom of the screen, viewed another user’s profile or sent a gift. But even this wasn’t enough for our colleagues on the product team. So Hotpanel made its appearance — a system for tracking mobile apps, and today we receive about 320 types of notification which add up to 12 billion per day. We will definitely write more about this; it would just be wrong to keep this to ourselves.
One of the main components of the Hotpanel interface are the hourly, daily and half-daily charts, divided up in various ways. As they say, a picture is worth a thousand words, so here you go:
These can be filtered based on any combination of fields:
Background history
Four years ago, at the very beginning of this project, when I was the only developer and there was only a small number of messages, I resolved the display problem very quickly, cheaply and cheerfully, but, I am sorry to say, not very effectively: up-front it was dc.js, and behind the scenes it was all supported by Redis, where each message type and each unit of time had its own HASHMAP, fields and their values acting as keys, a given value being the number of times we received a message with this particular combination of fields.
Example:
● Name HASHMAP: hourly:view_screen:2016–09–17
● Internally, keys looked like this*:
screen_name=view_photo,previous_screen=welcome,platform=android,app_version=1.2.3,gender=male
● Final value: 1 500 000
What all this meant was that on 17 September 2016 a male user on his Android device opened our app version 1.2.3 and went to view an image.
If, on that same day, another male user of an Android device opened our application of version 1.2.3 and likewise went to view an image, we would have a HINCRBY triggered, and the value would become 1500001 — cue rejoicing by our product manager responsible for images on Android version 1.2.3.
Also, next to Redis, there was a service in Python and Flask, which connected to Redis, HGETALLed all dictionaries from hourly:view_screen:2016–07–17 to hourly:view_screen:2016–10–17, put together a marvellous JSON-structure and fed it all to the client on dc.js. There were, of course, lots of optimisations, but I won’t talk about them, because, as they say, that’s all ancient history**.
Everything was just marvellous, as long as there was a small number of combinations. The speed was impressive (Crossfilter, on which dc.js is based, was written by the author of the d3 package and has a response time of less than 30 ms). Basically, it was a success. For a short while.
It was this success that killed us in the end. As the number of message types increased, new fields and values appeared, the number of combinations grew, not by the day, but by the hour. Using the interface became torturous. We ran into exotic limitations such as, for example, maximum size limit for JSON object being received. However, we ignored this ‘sign from above’ and came up with a clever (or so we thought) solution, breaking the JSON into parts and then ‘gluing it back together’ on the client. Particularly torturous were the questions from our colleagues in the web development department, “Your page took two minutes to devour 1 ½ gigabytes of RAM and to take down Chrome. Well done! How did you do it?”
Organic particles of a certain origin started hitting the fan every night. Memory use was growing and soon even 192 GB of RAM didn’t seem that much. It wasn’t nice getting phone calls from the monitoring staff at three o’clock in the morning either (even my 18 months old son has never done that!)
Basically, we had reached a point where “the bottoms don’t want and the tops cannot live in the old way”. The time had come to take action.
* Actually, they didn’t. We saved everything in JSON, which is just terribly ineffective.
** Actually, it wasn’t. I just had a look. We forgot to remove it; we just stopped using it. I just switched it off and got a call from the monitoring department straightaway. Well done, you guys!
System requirements
We had to find or come up with something which would be hidden away at the back-end and which could do the following:
1. Store the data from counters for a period of 120 days (this represents about 100 million different combinations; in uncompressed form this would be about 27 GB of data).
2. Filter based on any combination of fields and by date interval. The format is along the lines of field1 in (‘val11’, ‘val12’ … ) AND field2 in (‘val21’, ‘val22’, …) …. AND dt between x and y, which makes it clear that, unfortunately, indices won’t be of any use.
3. Present the results in the form of facets. If the message has eight fields, then eight dictionaries should be produced — one for each field. Each dictionary has to contain counters for each of the field values. To be very pedantic, in SQL it should look like this:
select
'G1' as name,
G1,
SUM(M)
from T
WHERE
D2 in (DQ2) and D3 in (DQ3) ... -- skip all filters related to G1 and p between PFrom and PTo
group by name, G1 UNION ALL select
'G2' as name,
G2,
SUM(M1)
from T
WHERE
D1 in (DQ1) and D3 in (DQ3) ... -- skip all filters related to G2 and p between PFrom and PTo
group by name, G2 UNION ALL
...
UNION ALL select
'GN' as name,
GN,
SUM(M1)
from T
WHERE
D1 in (DQ1) ... and D(N-1) in (DQ(N-1)) ... -- skip all filters related to GN
and p between PFrom and PTo
group by name, GN UNION ALL select
'p' as name,
p,
SUM(M1)
from T
WHERE
D1 in (DQ1) ... and Dn in (DQn) ...
group by 'name', p
4. Not get worked up if new fields and field values are added.
5. Not get worked up if new message types are added.
6. Give out the result almost instantaneously, i.e. on average within 100 ms, including the network, and, at worst, within 2 s (we attached a spinning wheel to our page for this eventuality).
7. Be able to enter 3 million new combinations within a maximum of a minute.
8. Be able to quickly delete data relating to past days.
9. All this must be able to work using the existing infrastructure, i.e. either on a single machine (192 GB of memory, 48 cores), on a Hadoop cluster or on an Exasol cluster which, it so happens, we have available.
10. All this must be simple to support, must allow itself to be monitored, not be a pain in the neck and not be mysterious about problems which occur.
Mitigating circumstances:
1. We didn’t need actual persistence to save data right after each change. New aggregates were added once per hour, so it was necessary to save everything on disk once after loading. Nevertheless, saving data shouldn’t have had a blocking effect.
2. Cardinality of each field — not more than 1000.
3. Number of fields — not more than 100 (all of String type).
4. No ACL (yet).
5. No transactions etc. Of course, counters must update atomically.
Preliminary conclusions:
1. It is only possible to process 27 GB within 100 ms, if you cunningly compress and use indexing or use all the CPUs.
2. Key-value stores are of no use to us. Various script options in Lua in Redis and Tarantool might have helped us, but they are still single-threaded, and are unlikely to be able to process so much data within the time-frame.
3. Likewise, relational databases aren’t up to the task in the light of requirements 4 and 5 above.
4. Presto, Impala and the like are all great, of course, but are unable to do anything in 100 ms. And for them 100 million entries is like using a sledgehammer to crack a nut. I am not even going to mention Hadoop and MapReduce here.
5. There are cunning and interesting things like Druid and InfluxDB which could, probably, resolve this issue, but they are too complicated. There wasn’t the option of setting up a separate cluster for all that.
6. You have probably noticed that this is all seems to point to time-series. Yes, practically speaking you are right, but not quite in the way you might imagine. In actual fact, each chart we have is not a single time-series, but the sum of millions of combinations. So, we had to discount a time-series store as an option as well.
For reasons ranging from my own innate stubbornness to a lack of time available for testing all the options, I enjoyed naively setting candidates at interviews the task of selecting an instrument for solving this problem. But even good candidates were unable to tell me anything to get my hopes up. Thus, my final hope, Elasticsearch, which seemed ideally suited for faceted search, fell by the wayside; it was too slow for our ‘atomic icebreaker’.
By this time constant nudges from my colleagues, a lack of proper sleep due to night-time antics by Redis, fading hope of finding something ready-to-use and free-of-charge and, it would be wrong to hide it from you, a purely geeky interest in finding a solution to a gripping problem took their toll: my eyes turned blood red, I lost all reason and decided to create a solution myself.
I wrote at home and covered by tracks thanks to my afore-mentioned 1 ½ year old son. We agreed that while he was quietly asleep after 7 pm I would sit in his bedroom and, under the pretence of being a caring father looking after his son, I would write code. I will remember those two weeks as the happiest weeks of being a father.
A couple of years ago, when we were choosing a new analytical database, I was amazed by the speed of solutions which operate solely in memory. Perhaps the idea of creating a data structure which yields a result within O(N) time doesn’t sound great at interview, but in practice it isn’t that bad (as long as all N elements happily reside in the memory).
A light goes on inside
There were basically two problems to solve: how to enter data and how to retrieve it.
Retrieving counters
I came up with an elegant structure for saving data. All the aggregates for a given type of message would be saved in a separate object (let’s call it a cube). Right at the top level we would have a chart of these cubes with its name as the key to the chart.
Each cube is partitioned by time (in our case — by day and by hour). Each partition saves aggregates in columns. Adding a new field is simply a case of trivially adding a new element to the chart for these fields.
Since we know that the maximum quantity of possible values for each field is small, we can encode it using a number, indeed in our case 10 bits is sufficient (however, to simplify the task and to have some capacity spare, I made it 16-bit). This allows you not only to save memory, but also to perform the search faster, since you now only need to compare 16 bits and not the whole string. In our case there are two types of column: columns for searching for values, which are 16-bit, and the counters themselves, which are 64-bit.
In this way, the key we already know screen_name=view_photo,prev_screen=welcome,platform=android,app_version=1.2.3,gender=male with a value of 1500000 is transformed into the following data: 1:1:1:1:1 1500000. And all this now takes up 5x2 + 8 = 18 bytes. The savings are plain to see.
The search can be resolved by a simple full scan. For each string the counter value for the relevant field value increases by the value from the counter field.
The speed isn’t too bad either. Ongoing implementation in Java can easily sort through 20 million values a second on a single MBP cpu core. Since we need a faceted search, the values for all the fields need to be read; not just the values for the fields as per the filter, so in the case of five fields we get 4 million counters per second.
Data entry
Unfortunately, this elegant data structure cannot cope with data inserts at all. If we need to go through the whole memory (which would take, let’s say, 100 ms) to enter each element, then it would take about 83 hours to make 3 million entries (this is indeed the number of combinations which accumulate every hour) — this is just excessive. Frankly, it is a waste of the CPU cycles being used up.
On the other hand, we know that data is only entered in respect of the past few hours or days. And we also know that, if we make an entry, we have all the fields available. So there is nothing to say that we can’t create a reverse index where the numeric values of the fields (1:1:1:1:1) will be the key, and the number of the string where the combination is located will be the value. This structure can be created in a fraction of a second and the key search time is fractions of a millisecond.
In our case, we were able to achieve a speed of the order of 150 000 inserts per second. This is, of course, in the case of an internal query, not taking into account time for the deserialisation of the REST request and network. This reverse index resides in the cache and is created for each partition. If it is not accessed for several days, it is deleted and frees up memory.
Most of our data is entered on the same (or previous) day it was sent, so really only a few partitions actually have existing reverse indices.
As I have already said, doing a search is a trivial matter for us, so it is also a trivial matter to parallelise it. So, you can sort through partitions in parallel, on separate threads. Given on average eight fields and 48 processors on the server we can achieve a scanning speed of 120 million strings per second. This means that, if the quantity of different combinations does not exceed 12 million, we can still fit into the set time period of 100 ms. This is, of course, in an ideal world, but we are almost there.
Saving to disk
It makes sense to save data to disk straight in compact format. Dictionaries are saved at the start of the file and, after that, columns of data with numbers instead of strings. And this can all be compressed. Strangely enough, in the case of Java the slowest component is the processor (and gzip compression). Moving to Snappy allowed us to reduce the time required to save from 60 s to 8 s.
Just in case, there is also an option to save in JSON format, so that the data can be gone through manually, should compatibility with other systems no longer be available.
Interaction with the outside world
The whole interface has been done via REST. This is perhaps the most boring part of the whole software, so much so that I am not even going to write about it. Entry via PUT, requests via GET, deletion using DELETE etc.
Java
As I have already mentioned, I wrote the whole thing in Java. Of all the so-called ‘fast’ languages, this is the only one I know how to program in. But I have a very strong suspicion that it would work even faster in C. I imagine that ‘goodies’ like SIMD would speed up the system a lot. My life’s dream would be to rewrite the whole thing in Rust. But, since at the current stage the performance is good enough for us and my son is older and isn’t up for going to bed at seven pm, I will just have to wait for this.***
I have to say that Java was a source of both joy and disappointment at the same time. In terms of full scan it had very good performance — I didn’t even expect that. But a disappointment was the garbage collector, which is constantly panicking if a lot of stuff accumulates and nothing is getting freed up. So, I had to write all these data structures for off-heap, using allocateDirect and unsafes. And this is, of course, all great, but you feel like you are coding in C and not in Java. I am not sure that, when the universe created the Java language, this is the kind of scenario it had in mind.
An even greater disappointment was the garbage collector when I had to create those self-same counters for facets. When 48 processors are simultaneously creating a HashMap with a couple of thousand elements, the garbage collector behaves as if it were the first time ever. As a result, the full scan of a couple of million strings takes less time than converting results from figures to strings and the subsequent consolidation of data from all partitions.
*** Why don’t you give it a try?
The present
At the present time, our sole specimen contains about 600 cubes which are made up of something of the order of 500 million entries. This all takes up about 80 GB of resident memory and the backup in compressed form (Snappy) takes up about 5 GB on the disk. The operation to save to disk takes something of the order of 30 s.
The system works very stably and, after the fix for a very exotic problem with hashes equal to MIN_VALUE, didn’t crash once.
The future
I enjoy speculating about the future, because you can say what you want, and you don’t have to produce any figures.
So, here are some options for improving the world:
1. First, as I have already said, rewrite the solution in a language better suited to such things as the fast and monotonous processing of data, where the garbage collector won’t bother everyone or itself.
2. Secondly, it would be great to teach CubeDB to interact with similar entities and to grow into a whole cluster.
3. Thirdly, since everything is so fast and in the memory, it makes sense to move some of the clever algorithms closer to the data. For example, something along the lines of Anomaly Detection, like they did with Gorilla on Facebook.
Since I’ve clearly got more ideas than time, we open-sourced our solution. And since you have read this far, I assume you are interested. Come and have a look. Play around with it. Use it. I think the idea is very simple, but appealing, and there is much more that can be improved. Go for it!
P.S. Data without visualisation is like a night club without music, so at the same time we released a set of front-end components for working with API CubeDB, and also put together a simple page to demonstrate the possibilities. I would, however, like to warn you that the demo toils on a single-core machine and it is difficult to assess the actual processing speed this way: on our internal system on real hardware and 48 processors there is a marked difference in speed.