Using Riak as Events Storage — Part 2

In this post, we’ll see how to push data to Riak, how to read it later on, and how to perform data processing out of Riak. As a reminder, we saw in the previous part that the events are schema-less HashTable data structures, grouped by epochs, data centers, types and subtypes, then serialised using Sereal and highly compressed.

dams
dams
Nov 24, 2015 · 10 min read

If you missed part 1

We strongly recommend that you read part one of this blog series. The previous part explains how Booking.com collects and stores events from its backend into a central storage, and why Riak was chosen to do so.

Pushing to Riak

Pushing data to Riak is done by a number of relocators, which are daemons running on the aggregation layer that then push events blobs to Riak.

PUT to Riak

Writing data to Riak is rather simple. For a given epoch, we have the list of events blobs, each of them having a different DC/type/subtype combination (remember, DC is short for Data Center). For example:

curl -d <data> -XPUT "https://node:8098/buckets/events/keys/1413813813:1:type1:subtype1:0"
# ...
curl -d <data> -XPUT "https://node:8098/buckets/events/keys/1413813813:2:type3::0"
curl -d "type1:subtype1:0|type1:subtype1:1|type3::0" -XPUT "https://riak_host:8098/buckets/epochs/keys/1413813813-2"

PUT options

When pushing data to the Riak cluster, we can use different attributes to change the way data is written — either by specifying which ones when using the PBC API, or by setting the buckets defaults.

"n_val" : 3,

"allow_mult" : false,
"last_write_wins" : true,

"w" : 3,
"dw" : 0,
"pw" : 0
  • allow_mult and last_write_wins prohibit siblings values; conflicts are resolved right away by using the last value written
  • w:3 means that when writing data to a node, we get a success response only when the data has been written to all the three replica nodes
  • dw:0 instruct Riak to wait for the data to have reached the node, not the backend on the node, before returning success.
  • pw:0 is here to specify that it's OK if the nodes that store the replicas are not the primary nodes (i.e. the ones that are supposed to hold the data), but replacement nodes, in case the primary ones were unavailable.

Reading from Riak

This is how the data and metadata for a given epoch is laid out in Riak:

bucket: epochs
key: 1428415043-1
value: 1:cell0:WEB:app:chunk0|1:cell0:EMK::chunk0

bucket: events
key: 1428415043:1:cell0:WEB:app:chunk0
value: <binary sereal blob>

bucket: events
key: 1428415043:1:cell0:EMK::chunk0
value: <binary sereal blob>
  • Parse the metadata value, split on the pipe character to get data keys, and prepend the epoch to them
  • Reject data keys that we are not interested in by filtering on type/subtype
  • Fetch the data keys in parallel
  • Deserialise the data
  • Data is now ready for processing
1435748400
1435748401
1435748402
...
1435749000

GET options

The events bucket (where the event data is stored) has the following properties:

"r"            : 1,
"pr" : 0,
"rw" : "quorum",
"basic_quorum" : true,
"notfound_ok" : true,
  • pr:0 remove the requirement that the data comes from a primary node
  • notfound_ok:true makes it so that as soon as one node can't find a key, Riak considers that the key doesn't exist (notfound_ok:true).

Real time data processing outside of Riak

After the events are properly stored in Riak, it’s time to use them. The first usage is quite simple: extract data out of them and process it on dedicated machines, usually grouped in clusters or aggregations of machines that perform the same kind of analysis. These machines are called consumers, and they usually run daemons that fetch data from Riak, either continuously or on demand. Most of the continuous consumers are actually small clusters of machines spreading the load of fetching data.

Limitations of data processing outside of Riak

Fetching data outside of the Riak clusters raises some issues that are difficult to work around without changing the processing mechanism.

Next post: data filtering and processing inside Riak

What if we could remove the CPU limitations by doing processing on the Riak cluster itself? What if we could work around the network bandwidth issue by generating sub-streams on the fly and in real-time on the Riak cluster?

Notes

[1] Some optimisation has been done, the main action was to implement a module to split a sereal blob without deserialising it, thus speeding up the process greatly. This module can be found here: Sereal::Splitter. Most of the time spent in splitting sereal blobs is now spent in decompressing it. The next optimization step would be to use compression that decrunches faster than the currently used gzip; for instance LZ4_HC.


Booking.com Development

Software development at Booking.com

dams

Written by

dams

Senior dev at Booking.com.

Booking.com Development

Software development at Booking.com