Using Riak as Events Storage — Part 3

In this post, we’ll see how to apply transformations to the events data stored in Riak without the data leaving the cluster. We saw in the previous parts how to gather, aggregate and store events in Riak, and how to fetch them for external processing. We’ll see now how to reduce bandwidth usage by applying data transformation without moving the events outside of the cluster.

If you missed Part 2

The Theory

This diagram is equivalent to:

So instead of bringing the data to the processing code, let’s bring the code to the data:

This is a typical use case for MapReduce. We’re going to see how to use MapReduce on our dataset in Riak, and also why it’s not a usable solution.

For the rest of this post, it’s important to establish a reference for all the events that are stored for a time period of exactly one second. Because we already happen to store our events by a second (and call it an “epoch”), using this unit of measure is a practical consideration that we’ll refer to as epoch-data.

A first attempt: MapReduce

Riak has a very good MapReduce implementation. MapReduce jobs can be written in Javascript or Erlang. We highly recommend using Erlang for better performance.

To perform events processing of an epoch-data on Riak, the MapReduce job would look like the following list. Metadata and data keys concepts are explained in the part 2 of the blog series. Here are the MapReduce phases:

  • Given a list of epochs and DCs, the input is the list of metadata keys, and as additional parameter, the processing code to apply to the data.
  • A first Map phase reads the metadata values and returns a list of data keys.
  • A second Map phase reads the data values, deserialises it, applies the processing code and returns the list of results.
  • A Reduce phase aggregates the results together

This works just fine. For one epoch-data, one data processing code is properly mapped to the events, the data deserialised and processed in around 0.1 second (on our initial 12 nodes cluster). This is by itself an important result: it’s taking less than one second to fully process one second worth of events. Riak makes it possible to implement a real-time MapReduce processing system [1].

Should we just use MapReduce and be done with it? Not really, because our use case involves multiple consumers doing different data processing at the same time. Let’s see why this is an issue.

The metrics

The use case is the following: every second, multiple consumers (say 20) need the result of one of the data processing (say 10) of the previous second.

We’ll consider that an epoch-data is roughly 70MB, data processing results are around 10KB each. Also, we’ll consider that the Riak cluster is a 30 nodes ring with 10 real CPUs available for data processing on each node.

The first metric we can measure is the external network bandwidth usage. This is the first factor that encouraged us to move away from fetching the events out of Riak to do external processing. External bandwidth usage is the bandwidth used to transfer data between the cluster as a whole, and the outside world.

The second metric is the internal network bandwidth usage. This represents the network used between the nodes, inside of the Riak cluster.

Another metric is the time (more precisely the CPU-time) it takes to deserialise the data. Because of the heavily compressed nature of our data, decompression and deserialising one epoch-data takes roughly 5 sec.

The fourth metric is the CPU-time it take to process the deserialized data, analyze it and produce a result. This is very fast (compared to deserialisation), let’s assume 0.01 sec. at most.

Note: we are not taking into account the impact of storing the data in the cluster (remember that events blobs are being stored every second) because it’s impacting the system the same way in both external processing and MapReduce.

Metrics when doing external processing

External bandwidth usage

Internal bandwidth usage

Deserialise time

Processing time

Metrics when using MapReduce

External bandwidth usage

Internal bandwidth usage

Deserialise time

Processing time

Limitations of MapReduce

It’s possible to overcome this difficulty by caching the deserialised data in memory, within the Erlang VM, on each node. CPU time would still be 3 times higher than needed (because a map job can run on any of the 3 replicas that contains the targeted data) but at least it wouldn’t be tied to the number of parallel jobs.

Another issue is the fact that writing MapReduce jobs is not that easy, especially because — in this case — it’s a prerequisite to know Erlang.

Last but not least, it’s possible to create very heavy MapReduce jobs, easily consuming all the CPU time. This directly impacts the performance and reliability of the cluster, and in extreme cases the cluster may be unable to store incoming events at a sufficient pace. It’s not trivial to fully protect the cluster against MapReduce misuse.

A better solution: post-commit hooks

This technical solution is what is currently in production at on our Riak events storage clusters, and it uses post-commit hooks and a companion service on the cluster nodes.

We’ll explore in detail this solution in the next blog post, so stay tuned!


Would you like to be a Developer at Work with us! Development

Software development at Development

Software development at

Damien "dams" Krotkine

Written by

Senior dev at Development

Software development at