Using Riak as Events Storage — Part 1
Booking.com constantly monitors, inspects, and analyzes our systems in order to make decisions. We capture and channel events from our various subsystems, then perform real-time, medium and long-term computation and analysis.
By Damien Krotkine, Ivan Paponov
This is a critical operational process, since our daily work always gives precedence to data. Relying on data removes the guesswork in making sound decisions.
In this series of blog posts, we will outline details of our data pipeline, and take a closer look at the short and medium-term storage layer that was implemented using Riak.
Introduction to Events Storage
Booking.com receives, creates, and sends an enormous amount of data. Usual business-related data is handled by traditional databases, caching systems, etc. We define events as data that is generated by all the subsystems on Booking.com.
In essence, events are free-form documents that contain a variety of metrics. The generated data does not contain any direct operational information. Instead, it is used to report status, states, secondary information, logs, messages, errors and warnings, health, and so on. The data flow represents a detailed status of the platform and contains crucial information that will be harvested and used further down the stream.
To put this in numerical terms - we have more than billions of events per day, streaming at more than 100 MB per second, and adding up to more than 6 TB per day.
Here are some examples of how we use the events stream:
- Visualisation: Wherever possible, we use graphs to express data. To create them, we use a heavily-modified version of Graphite.
- Looking for anomalies: When something goes wrong, we need to be notified. We use threshold-based notification systems (like seyren) as well as a custom anomaly detection software, which creates statistical metrics (e.g. change in standard deviation) and alerts if those metrics look suspicious.
- Gathering errors: We use our data pipeline to pass stack traces from all our production servers into ElasticSearch. Doing it this way (as opposed to straight from the web application log files) allows us to correlate errors with the wealth of the information we store in the events.
These typical use-cases are made available in less than one-minute after the related event has been generated.
High Level overview
This is a very simplified diagram of the data flow:
We can generate events by using literally any piece of code that exists on our servers. We pass a HashMap to a function, which packages the provided document into a UDP packet and sends it to a collection layer. This layer aggregates all the events together into "blobs", which are split by seconds (also called epochs) and other variables. These event blobs are then sent to the storage layer running Riak. Finally, Riak sends them on to Hadoop. The Riak cluster is meant to safely store around ten days of data. It is used for near real-time analysis (something that happened seconds or minutes ago), and medium-term analysis of relatively small amounts of data. We use Hadoop for older data analysis or analysis of a larger volume of data.
The above diagram is a simplified version of our data flow. In practical application, it's spread across multiple datacenters (DC), and includes an additional aggregation layer.
Individual Events
An event is a small schema-less [1] piece of data sent by our systems. That means that the data can be in any structure with any level of depth, as long as the top level is a HashTable. This is crucial to Booking.com - the goal is to give as much flexibility as possible for the sender, so that it's easy to add or modify the structure, or the type and number of events.
Events are also tagged in four different ways:
- the epoch at which they were created
- the DC where they originated
- the type of event
- the subtype.
Some common types are:
- WEB events (events produced by code running under a web server)
- CRON events (output of cron jobs)
- LB events (load balancer events)
The subtypes are there for further specification and can answer questions like: "Which one of web server systems are we talking about?".
Events are compressed Sereal blobs. Sereal is possibly the best schema-less serialisation format currently available. It was also written at Booking.com.
An individual event is not very big, but a huge number of them are sent every second.
We use UDP as transport because it provides a fast and simple way to send data. Despite some (very low) risk of data loss, it doesn't impact senders sending events. We are experimenting with an UDP-to-TCP relay that will be local to the senders.
Aggregated Events
Literally every second, events from this particular second (called epoch), DC number, type, and subtype are merged together as an Array of events on the aggregation layer. At this point, it's important to try and get the smallest size possible, so the events of a given epoch are re-serialized as a Sereal blob, using these options:
compress => Sereal::Encoder::SRL_ZLIB,
dedupe_strings => 1
dedupe_strings
increases the serialisation time slightly. However it removes strings duplications which occur a lot since events are usually quite similar between them. We also add gzip compression.
We also add the checksum of the blob as a postfix, to be able to ensure data integrity later on. The following diagram shows what an aggregated blob of events looks like for a given epoch, DC, type, and subtype. You can get more information about the Sereal encoding in the Sereal Specification.
This is the general structure of an events blob:
The compressed payload contains the events themselves. It's an Array of HashMaps, Serialized in a Sereal structure and gzip-compressed. Here is an example of a trivial payload of two events, as follows:
[
{ cpu => 5 },
{ cpu => 99 }
]
And the gzipped payload would be the compressed version of this binary string:
It can be hard to follow these hexdigits [2], yet it's a nice illustration of why the Sereal format helps us to reduce the size of serialised data. The second array element is encoded on far fewer bytes than the first one, since the key has already be seen. The resulting binary is then re-compressed. The Sereal implementation offers multiple compression algorithms, including Snappy and gzip.
A typical blob of events for one second/DC/type/subtype can weight anywhere from several kilobytes to several megabytes, which translates into a (current) average of around 250 gigabytes per hour.
Side note: smaller subtypes on this level of aggregation aren't always used, because we want to minimise the data we transmit over our network by having good compression ratios. Therefore we split types into subtypes only when the blobs are big enough. The downside to this approach is that consumers have to fetch data for the whole type, then filter out only subtypes they want. We're looking at ways to find more balance here.
Data flow size and properties
Data flow properties are important, since they're used to decide how data should be stored:
- The data is timed and all the events blobs are associated with an epoch. It’s important to bear in mind that events are schema-less, so the data is not a traditional time series.
- Data can be considered read-only; the aggregated events blobs are written every second and almost never modified (history rewriting happens very rarely).
- Once sent to the storage, the data must be available as soon as possible
Data is used in different ways on the client side. A lot of consumers are actually daemons that will consume the fresh data as soon as possible - usually seconds after an event was emitted. A large number of clients read the last few hours of data in a chronological sequence. On rare occasions, consumers access random data that is over a few days old. Finally, consumers that want to work on larger amounts of older data would have to create Hadoop jobs.
There is a large volume of data to be moved and stored. In numerical terms:
- Once serialized and compressed into blobs, it is usually larger than 50 MB/s
- That's around 250 GB per hour and more than 6 TB per day
- There is a daily peak hour but the variance of the data size is not huge: There are no quiet periods
- Yearly peak season stresses all our systems, including events transportation and storage, so we need to provision capacity for that
Why Riak
In order to find the best storage solution for our needs, we tested and benchmarked several different products and solutions.
The solutions had to reach the right balance of multiple features:
- Read performance had to be high as a lot of external processes will use the data.
- Write security was important, as we had to ensure that the continuous flow of data could be stored. Write performance should not be impacted by reads.
- Horizontal scalability was of utmost importance, as our business and traffic continuously grows.
- Data resilience was key: we didn't want to lose portions of our data because of a hardware problem.
- Allowed a small team to administrate and make the storage evolve.
- The storage shouldn't require the data to have a specific schema or structure.
- If possible, it would be able to bring code to data, perform computation on the storage itself, instead of having to get data out of the storage.
After exploring a number of distributed file systems and databases, we chose Riak over distributed Key-Value stores. Riak had good performance and predictable behavior when nodes fail and when scaling up. It also had the advantage of being easy to grasp and implement within a small team. Extending it was very easy (which we'll see in the next part of this series of blog posts) and we found the system very robust - we never had to face dramatic issues or data loss.
Disclaimer: This is not an endorsment for Riak. We compared it carefully to other solutions over a long period of time and it seemed to be the best product to suit our needs. As an example, we thoroughly tested Cassandra as an alternative: it had a larger community and similar performance but was less robust and predictable; it also lacked some advanced features. The choice is ultimately a question of priorities. The fact that our events are schema-less made it almost impossible for us to use solutions that require knowledge of the data structures. Also we needed a small team to be able to operate the storage, and a way to process data on the cluster itself, using MapReduce or similar mechanisms.
Riak 101
The Riak cluster is a collection of nodes (in our case physical servers), each of which claims ownership of a given key. Depending on the chosen replication factor, each key might be owned by multiple nodes. You can ask any node for a key and your request will be redirected to one of the owners. Same goes for writes.
On closer inspection of Riak, we see that keys are grouped into virtual nodes. Each physical node can own multiple virtual nodes. This simplifies data rebalancing when growing a cluster. Riak does not need to recalculate the owner for each individual key; it will only do it per virtual node.
We won't cover Riak architecture in a great detail in this post, but we recommend reading the following article for further information.
Riak clusters configuration
The primary goal of this storage is to keep the data safe. We went with the regular replication number value of three. Even if two nodes owning the same data will go down, we won't lose our data.
Riak offers multiple back-ends for actual data storage. The main three are Memory, LevelDB, and Bitcask. We chose Bitcask, since it was suitable for our particular needs. Bitcask uses log-structured hash tables that provide very fast access. As data gets written to the storage, Bitcask simply appends data to a number of opened files. Even if a key is modified or deleted, the information will be written at the end of these storage files. An in-memory HashTable maps the keys with the position of their (latest) value in files. That way, at most one seek is needed to fetch data from the file system.
Data files are then periodically compacted, and Bitcask provides very good expiration flexibility. Since Riak is a temporary storage solution for us, we set it up with automatic expiration. Our expiration period varies. It depends on the current cluster shape, but usually falls between 8-11 days.
Bitcask keeps all of the keys of a node in memory, so keeping large numbers of individual events as key value pairs isn't trivial. We sidestep any issues by using aggregations of events (blobs), which drastically reduce the number of needed keys.
More information about Bitcask can be found here.
For our conflict resolution strategy, we use Last Write Wins. The nature of our data (which is immutable as we described before) allows us to avoid the need for conflict resolution.
The last important part of our setup is load balancing. It is crucial in an enviromnent with a high level of reads, and only 1 gigabit network. We use our own solution for that based on Zookeeper. Zooanimal daemons are running on the riak nodes, and collect information about system health. The information is then aggregated into simple text files, where we have an ordered list of IP addresses, plus up and running Riak nodes, which we can connect to. All our Riak clients simply choose a random node to send their requests to.
We currently have two Riak clusters in different geographical locations, each of which have more than 30 nodes. More nodes equates to more storage space, CPU power, RAM, and more network bandwidth available.
Data Design
Riak is primarily a key-value store. Although it provides advanced features (secondary indexes, MapReduce, CRDTs), the simplest and most efficient way to store and retrieve data is to use the key-value model.
Riak has three concepts — a bucket is a namespace, in which a key is unique. A key is the identifier of the data; and has to be stored in a bucket. A value is the data; it has an associated mime-type, which can enable Riak awareness of its type.
Riak doesn’t provide efficient ways to retrieve the list of buckets or the list of keys by default [3]. When using Riak, it’s important to know the bucket and key to access. This is usually resolved by using self-explanatory identifiers.
In our case, our events are stored as Sereal-encoded blobs. From these, we know the datacenter, type, subtype, and of course the time at which it was created.
When we need to retrieve data, we always know the time we want. We are also confident in the list of our datacenters. It doesn’t change unexpectedly so we can make it static for our applications. We are not always sure about what types or subtypes will appear in a given epoch for a given datacenter. On some seconds events of certain types may not arrive.
We came up with this simple data design:
- events blobs are stored in the events bucket, keys being
<epoch>:<dc>:<type>:<subtype>:<chunk>
- metadata are stored in the epochs bucket, keys being
<epoch>:<dc>
and values being the list of events keys for this epoch and DC combination
The value of chunk is an integer, starting at zero, which keeps event blobs smaller than 500 kilobytes each. We use the integer to split big events blobs into smaller ones, so that Riak can function more efficiently.
We’ll see this data design in action when pushing data to Riak, in the next blog post of this series.
Next post: data processing outside of Riak
The next part of this blog posts series will explain how we enter and fetch data from Riak, in order to do real-time processing and batch processing.
Read the next part now! Using Riak as Events Storage — Part 2
Notes
[1] It is not strictly true that our events are schema-less. They obey the structure that the producers found the most useful and natural. But they are so many producers which each of them sending events that have a different schema, so it’s almost equivalent to considering them schema-less. Our events can be seen as structured, yet with so many schemas that they can’t be traced. There is also complete technical freedom to change the structure of an event, if it’s seen as useful by a producer.
[2] After spending some time looking at and decoding Sereal blobs, the human eye easily recognizes common data structures like small HashMaps, small Arrays, small Integers and VarInts, and of course, Strings, since their content is untouched. That makes Sereal an almost human readable serialisation format, especially after a hexdump.
[3] This can be worked around by using secondary indexes (2i) if the backend is eleveldb or Riak Search, to create additional indexes on keys, thus enabling listing them in various ways.
Would you like to be a Developer at Booking.com? Work with us!