Using Riak as Events Storage — Part 2

In this post, we’ll see how to push data to Riak, how to read it later on, and how to perform data processing out of Riak. As a reminder, we saw in the previous part that the events are schema-less HashTable data structures, grouped by epochs, data centers, types and subtypes, then serialised using Sereal and highly compressed.

dams
dams
Nov 24, 2015 · 10 min read

By Damien Krotkine, Ivan Paponov

Image for post
Image for post

If you missed part 1

Pushing to Riak

Side note: it’s not recommended to have keys more then 1–2MB in Riak (see this FAQ). And since our blobs can be 5–10MB in size, we shard them into chunks, 500KB each. Chunks are valid Sereal documents, which means we do not have to stich chunks together in order to retrieve data back.

This means that we have quite a lot of blobs to send to Riak, so to maximise our usage of networking, I/O, and CPU, it’s best to send data in a mass-parallel way. To do so, we maintain a number of forked processes (20 per host is a good start), in which each of them push data to Riak.

Pushing data to Riak can be done using the HTTP API, or the Protocol Buffers Client (PBC) API. PBC has a slighly better performance.

Whatever protocol is used, it’s important to maximise I/O utilisation. One way is to use an HTTP library that parallelises the requests in term of I/O (YAHC is an example). Another method is to use an asynchronous Riak Client like AnyEvent::Riak.

We use an in-house library to create and maintain a pool of forks, but there are more than one existing libraries on CPAN, like Parallel::ForkManager.

PUT to Riak

Image for post
Image for post

The first task is to slice the blobs into 500 KB chunks and add a postfix index number to their name. That gives:

Image for post
Image for post

Next, we can store all the event blobs in Riak in the events bucket. We can simulate it with curl:

curl -d <data> -XPUT "https://node:8098/buckets/events/keys/1413813813:1:type1:subtype1:0"
# ...
curl -d <data> -XPUT "https://node:8098/buckets/events/keys/1413813813:2:type3::0"

Side note: we store all events in each of the available Riak clusters. In other words, all events from all DCs will be stored in the Riak cluster which is in DC 1, as well as in the Riak cluster which is in DC 2. We do not use cross DC replication to achieve that — instead we simply push data to all our clusters from the relocators.

Once all the events blobs are stored, we can store the metadata, which is the list of the event keys, in the epochs bucket. This metadata is stored in one key per epoch and DC. So for the current example, we will have 2 keys: 1413813813-1 and 1413813813-2. We have chosen to store the list of events blobs names as pipe separated values. Here is a simulation with curl for DC 2:

curl -d "type1:subtype1:0|type1:subtype1:1|type3::0" -XPUT "https://riak_host:8098/buckets/epochs/keys/1413813813-2"

Because the epoch and DC are already in the key name, it’s not necessary to repeat that in the content. It’s important to push the metadata after pushing the data.

PUT options

Riak’s documentation provides a comprehensive list of the parameters and their meaning. We have set these parameters as follows:

"n_val" : 3,

"allow_mult" : false,
"last_write_wins" : true,

"w" : 3,
"dw" : 0,
"pw" : 0

Here is a brief explanation of these parameters:

  • n_val:3 means that the data is replicated three times
  • allow_mult and last_write_wins prohibit siblings values; conflicts are resolved right away by using the last value written
  • w:3 means that when writing data to a node, we get a success response only when the data has been written to all the three replica nodes
  • dw:0 instruct Riak to wait for the data to have reached the node, not the backend on the node, before returning success.
  • pw:0 is here to specify that it's OK if the nodes that store the replicas are not the primary nodes (i.e. the ones that are supposed to hold the data), but replacement nodes, in case the primary ones were unavailable.

In a nutshell, we have a reasonably robust way of writing data. Because our data is immutable and never modified, we don’t want to have siblings or conflict resolution on the application level. Data loss could, in theory, happen if a major network issue happened just after having acknowledged a write, but before the data reached the backend. However, in the worst case we would lose a fraction of one second of events, which is acceptable for us.

Reading from Riak

bucket: epochs
key: 1428415043-1
value: 1:cell0:WEB:app:chunk0|1:cell0:EMK::chunk0

bucket: events
key: 1428415043:1:cell0:WEB:app:chunk0
value: <binary sereal blob>

bucket: events
key: 1428415043:1:cell0:EMK::chunk0
value: <binary sereal blob>

Fetching one second of data from Riak is quite simple. Given a DC and an epoch, the process is as follow:

  • Read the metadata by fetching the key <epoch>-<dc> from the bucket "epochs"
  • Parse the metadata value, split on the pipe character to get data keys, and prepend the epoch to them
  • Reject data keys that we are not interested in by filtering on type/subtype
  • Fetch the data keys in parallel
  • Deserialise the data
  • Data is now ready for processing

Reading a time range of data is done the same way. Fetching ten minutes of data from Wed, 01 Jul 2015 11:00:00 GMT would be done by enumerating all the epochs, in this case:

1435748400
1435748401
1435748402
...
1435749000

Then, for each epoch, fetch the data as previously mentioned. It should be noted that Riak is specifically tailored for this kind of workload, where multiple parallel processes perform a huge number of small requests on different keys. This is where distributed systems shine.

GET options

"r"            : 1,
"pr" : 0,
"rw" : "quorum",
"basic_quorum" : true,
"notfound_ok" : true,

Again, let’s look at these parameters in detail:

  • r:1 means that when fetching data, as soon as we have a reply from one replica node, Riak considers this as a valid reply, it won't to compare it with other replicas.
  • pr:0 remove the requirement that the data comes from a primary node
  • notfound_ok:true makes it so that as soon as one node can't find a key, Riak considers that the key doesn't exist (notfound_ok:true).

These parameter values allow to be as fast as possible when fetching data. In theory, such values don’t protect against conflicts or data corruption. However, in the “Aggregated Events” section (see the first post), we’ve seen that every event blob has a suffix checksum. When fetching them from Riak, this enables the consumer to verify that there is no data corruption. The fact that the events are never modified ensures that no version conflict can occur. This is why having such “careless” parameter values is not an issue for this use case.

Real time data processing outside of Riak

Image for post
Image for post

Some data processing is required at near real-time. This is the case for monitoring, and building graphs. Booking.com heavily uses graphs at every layer of its technical stack. A big portion of graphs are generated from Events. Data is fetched every second from the Riak storage, processed, and dedicated graphing data is sent to an in-house Graphite cluster.

Other forms of monitoring also consume the events stream- fetched continuously and aggregated in per-second, per-minute, and daily aggregations in external databases, which are then provided to multiple departments via internal tools.

These kind of processes try to be as close as possible to real-time. Currently there are 10 to 15 seconds of lag. This lag could be shorter: a portion of it is due to the collection part of the pipeline, and an even bigger part of it is due to the re-serialisation of the events as they are grouped together, to reduce their size. A good deal of optimisation could be done there to reduce the lag down to a couple of seconds [1]. However, there was no operational requirement for reducing it and 15 seconds is small enough for our current needs.

Another way of using the data is to stick to real-time, but accumulate seconds in periods. One example is our Anomaly Detector, which continuously fetches events from the Riak clusters. However, instead of using the data right away, it accumulates it on short moving windows of time (every few minutes) and applies statistical algorithms on it. The goal is to detect anomalous patterns in our data stream and provide the first alert that prompts further action. Needless to say, this client is critical.

Another similar usage is done when gathering data related to A/B testing. A large number of machines harvest data from the events’ flow before processing it and storing the results in dedicated databases for use in experimentation-related tooling.

There are a number of other usages of the data outside of Riak, including manually looking at events to check new features behaviours or analysing past issues / outages.

Limitations of data processing outside of Riak

First of all, there is a clear network bandwidth limitation to the design: the more consumer clusters there are, the more network bandwidth is used. Even with large clusters (more than 30 nodes), it’s relatively easy to exhaust the network capacity of all the nodes as more and more fetchers try to get data from them.

Secondly, each consumer cluster tends to use only a small part of the events flow. Even though consumers can filter out types, subtypes, and DCs, the resulting events blobs still contain a large quantity of data that is useless to the consumer. For storage efficiency, events need to be stored as large compressed serialised blobs, so splitting them more by allowing more subtyping is not possible [2].

Additionally, statically splitting the events content is too rigid since use of the data changes over time and we do not want to be a bottleneck to change for our downstream consumers. Part of an event from a given type that was critical 2 years ago might be used for minor monitoring now. A subtype that was heavily used for six month may now be rarely used because of a technical change in the producers.

Finally, the amount of CPU time needed to uncompress, load, and filter the big events blobs is not tiny. It usually takes around five seconds to fetch, uncompress, and filter one second’s worth of events. Which means that any real-time data crunching requires multiple threads and likely multiple hosts — usually a small cluster. It would be much simpler if Riak could provide a real-time stream of data exactly tailored to the consumer need.

Next post: data filtering and processing inside Riak

This is exactly what we implemented, using simple concepts, and leveraging the ease of use and hackability of Riak. These concepts and implementations will be described in the next part of this blog posts series!

Read the next part now! Using Riak as Events Storage — Part 3

Notes

[2] At that point, the attentive reader may jump in the air and proclaim “LevelDB and snappy compression!”. It is indeed possible to use LevelDB as Riak storage backend, which provides an option to use Snappy compression on the blocks of data stored. However, this compression algorithm is not good enough for our need (using gzip reduced the size by a factor of almost 2). Also, Leveldb (or at least the eleveldb implementation that is used in Riak) doesn’t provide automatic expiration which is critical to us, and had issues with reclaiming free space after key deletions, with versions below 2.x


Would you like to be a Developer at Booking.com? Work with us!

Booking.com Development

Software development at Booking.com

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch

Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore

Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store