Using Riak as Events Storage — Part 3

In this post, we’ll see how to apply transformations to the events data stored in Riak without the data leaving the cluster. We saw in the previous parts how to gather, aggregate and store events in Riak, and how to fetch them for external processing. We’ll see now how to reduce bandwidth usage by applying data transformation without moving the events outside of the cluster.

dams
dams
Mar 2, 2016 · 8 min read
Image for post
Image for post

If you missed Part 2

We strongly recommend that you read part 2 of this blog series. The previous parts explains how Booking.com collects and stores events from its backend into a central storage, and how we use it to do events analysis.

The Theory

The reasoning is actually very simple. The final goal is to perform data processing of the events blobs that are stored in Riak in real-time. Data processing usually produces a very small result, and it appears to be a waste of network bandwidth to fetch data outside of Riak to perform data analysis on consumer clusters, as in this example::

Image for post
Image for post
Image for post
Image for post
Image for post
Image for post

A first attempt: MapReduce

MapReduce is a very well known (if somewhat outdated) way of bringing the code near the data and distributing data processing. There are excellent papers explaining this approach for further background study.

  • A first Map phase reads the metadata values and returns a list of data keys.
  • A second Map phase reads the data values, deserialises it, applies the processing code and returns the list of results.
  • A Reduce phase aggregates the results together
Image for post
Image for post

The metrics

To be able to test the MapReduce solution, we need a use case and some metrics to measure.

Metrics when doing external processing

When doing standard data processing as seen in the previous part of this blog series, one epoch-data is fetched out from Riak, and deserialised and processed outside of Riak.

External bandwidth usage

The external bandwidth usage is high. For each query, the epoch-data is transferred, so that’s 20 queries times 70MB/s = 1400 MB/s. Of course, this number is properly spread across all the nodes, but that’s still roughly 1400 / 30 = 47 MB/s. That, however, is just for the data processing. There is a small overhead that comes from the clusterised nature of the system and from gossiping, so let’s round that number to 50 MB/s per node, in external output network bandwidth usage.

Internal bandwidth usage

The internal bandwidth usage is very high. Each time a key value is requested, Riak will check its 3 replicas, and return the value. So 3 x 20 x 70MB/s = 4200 MB/s. Per node, it’s 4200 MB/s / 30 = 140 MB/s

Deserialise time

Deserialise time is zero: the data is deserialised outside of Riak.

Processing time

Processing time is zero: the data is processed outside of Riak.

Metrics when using MapReduce

When using MapReduce, the data processing code is sent to Riak, included in an ad hoc MapReduce job, and executed on the Riak cluster by sending the orders to the nodes where the epoch-data related data chunks are stored.

External bandwidth usage

When using MapReduce to perform data processing jobs, there is certainly a huge gain in network bandwidth usage. For each query, only the results are transferred, so 20 x 10KB/s = 200 KB/s.

Internal bandwidth usage

The internal usage is also very low: it’s only used to spread the MapReduce jobs, transfer the results, and do bookkeeping. It’s hard to put a proper number on it because of the way jobs and data are spread on the cluster, but overall it’s using a couple of MB/s at most.

Deserialise time

Deserialise time is high: for each query, the data is deserialised, so 20 x 5 = 100 sec for the whole cluster. Each node has 10 CPUs available for deserialisation, so the time needed to deserialise one second worth of data is 100/300 = 0.33 sec. We can easily see that this is an issue, because already one third of all our CPU power is used for deserialising the same data in each MapReduce instance. It’s a big waste of CPU time.

Processing time

Processing time is 20 x 0.01 = 0.2s for the whole cluster. This is really low compared to the deserialise time.

Limitations of MapReduce

As we’ve seen, using MapReduce has its advantages: it’s a well-known standard, and allows us to create real-time processing jobs. However it doesn’t scale: because MapReduce jobs are isolated, they can’t share the deserialised data, and CPU time is wasted, so it’s not possible to have more than one or two dozens of real-time data processing jobs at the same time.

A better solution: post-commit hooks

We explored a different approach to enable real-time data processing on the cluster that scales properly by deserialising data only once, allows us to cap its CPU usage, and allows us to write the processing jobs in any language, while still bringing the code to the data, removing most of the internal and external network usage.

Notes

[1] Using MapReduce on Riak is usually somewhat discouraged because most of the time it’s being used in a wrong way, for instance when performing bulk fetch or bulk insert or traversing a bucket. The MapReduce implementation in Riak is very powerful and efficient, but must be used properly. It works best when used on a small number of keys, even if the size of data processed is very large. The fewer keys the less bookkeeping and the better performance. In our case, there are only a couple of hundred keys for one second worth of data (but somewhat large values, around 400K), which is not a lot. Hence the great performance of MapReduce we’ve witnessed. YMMV.


Booking.com Development

Software development at Booking.com

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch

Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore

Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store