We have built a distributed data store we call ‘CloudDS’ (no points for originality here) that has been in use for a few years now with no failures whatsoever.


It is BestPrice.gr’s underlying datastore (a small 7 nodes cluster — they are all pretty much idle all the time, but we like to play it safe so we set replication factor to 5 so we have 5 copies of every object spread across that cluster’s nodes).


When time permits, we improve things here and there, add more features, and consider advancements made by other fellow developers and either made it into publications (papers) or into the respectful open source implementations. Most of the ones we like make it back to CloudDS. This August in particular, August being the month where we are not all working on products development and can do pretty much whatever excites us the most, we added a whole bunch of such new features to it.


We rarely use open source software( we build everything ourselves; don’t judge us, this works for us) and currently we do not release OS ( this is bound to change soon ), but I really wanted to contribute back to the ‘community’ somehow. I thought maybe we could describe a few ideas that worked for us and may also benefit other vendors/developers.

This list is unordered and may or may not make sense to you. It is meant to provide food for thought to developers so that they perhaps can come up with ways to improve their own systems in one way or another.

In the western scientific tradition we stand on the shoulders of giants, says Young, echoing both Torvalds and Sir Isaac Newton before him.
(excerpt from Free as in Freedom book)

CloudDS is written in C++ and, like almost all other similar services, is based on various Google and Amazon seminal papers. Specifically, it resembles Cassandra the most in terms of architecture and design, but is also different in many significant ways.


The most important underlying idea is that once you build a high-performance CDHT based nodes grid that operates as expected, there are various higher-level infrastructure services that can be build on top of that core layer. Building a multi-purprose such infrastructure enables the development of many interesting features.

Source: compositecode.com

Currently, CloudDS supports wide-columns (Cassandra rows, column families, super columns, columns) that is suitable for most models, and pure KV (value is a serialised Column Family ).

The major differences is that wide-columns depend on merging values from SSTables and Memory Tables (so that you can e.g update columns individually, no need to update the whole thing every time ) and KV objects do not go through the commit logs(there is one commit log per column family) nor the Memory Table. They are stored in a ‘live’ data file (append only) — live data files turn to immutable data files and they in turn get compacted down to smaller ones, etc. So, to lookup a key’s value we just look for it in the list of all immutable datafiles and the active datafile.

The main reason that led to the development of this data store will be explained later.


Every node is aware of each other’s load; this comes into play when deciding which node to use for retrieving the whole value ( similar to the way Cassandra does, except that we found that selecting the ‘data node’ based on distance first, and by load for same-DC nodes worked better in practice ).

Sorting replica nodes associated with a given key by (distance, load)

All nodes will propagate trough the gossip messages exchange which nodes they have pending hinted-handoff to deliver. CloudDS also relies on a per-ColumnFamily ReadRepairChance option, that pretty much is used like in Cassandra. Unlike Cassandra, CloudDS will consider each node before it asks for its digest (for read repair); if no nodes have any pending hinted handoffs to it and if we can confidently deduce that the node is upto date based on some other heuristics, we do not ask for digests. This results in almost always being able to get the correct value and not having to ask any replicas for digests, unless it’s needed.

Tests if we can skip asking for a value digest from endpoint, for read repair operations

CloudDS encapsulates all operations into tasks, including all incoming requests from clients or other nodes. A tasks scheduler is responsible for scheduling those tasks among threads; it uses work-stealing queues and will optionally create or destroy threads based on current needs. There are two type of threads; one is responsible for handling system tasks (Compactions, etc), other for everything else. The scheduler schedules system tasks to a small number of special OS threads with reduced CPU and I/O priority in order to reduce overall impact.

struct task 
{
Thread *thread;
timestamp_t startTs; // for identifying long running tasks
struct scheduler_task st; // facilitates scheduler operations

virtual void OnExec(struct environment *const) = 0;
virtual const char *Name(void) const = 0;
};
// Make sure system tasks threads get special treatment
pid_t threaDPID = (pid_t)syscall(SYS_gettid);
(void)ioprio_set(IOPRIO_WHO_PROCESS, threadPID, IOPRIO_PRIO_VALUE(IOPRIO_CLASS_IDLE, 0));
(void)setpriority(PRIO_PROCESS, threadPID, 10);

There is one thread for network I/O (I/O multiplexing). The I/O thread will group messages with the same destination transparently (will also compress/decompress incoming/outgoing messages using snappy) and is tightly coupled with the gossip-subsystem so that it can notify it for I/O failures immediately, without having to wait for the gossip heuristics to consider it failed/unavailable.


No thread ever blocks; the tasks scheduler will schedule another task to it (pre-emptively) if a thread has to wait, though in general this is implemented as chains, where a task will execute an expensive operation and that expensive operation will schedule a ‘completion’ task. This arrangement keeps all CPU cores busy and thanks to lock-free constructs(where possible) it is a good fit for CloudDS.


CloudDS merges ColumnFamilies(values) either using merge-sort or binary sort with replace (memmove() things around) depending on a heuristic which considers which is likely faster.

All objects (ColumnFamilies, Columns, SuperColumns, etc etc) are ref-counted; so we just retain/release objects — no need for maintaining copies unless it makes more sense to do so. Furthermore, when merging multiple SSTables, it doesn’t allocate any memory whatsoever for deserialized ColumnFamilies; it maps the structure into an object and operates on the structure. This has resulted in major speeds ups during compactions. It does something similar when replaying commit log files.


Gossip messages add up and can become a real bandwidth issue if you get hundreds of nodes talking to each other. CloudDS nodes will deal with that by routing all gossip messages to one node per distinct DC and relying on that node to gossip states to its local/neighbour nodes. This feature hasn’t been enabled because we don’t really have that many nodes to test it properly, but it works in limited testing. Furthermore, once it makes sense to implement this, we will enable support for aggregator roles, where instead of maintaining connections to practically all other nodes in the cluster (for each node can coordinate requests processing), a node will pick a node as aggregator and contact that alone, and that in turn will contact X more and then collect its responses, and those X more can do the same thing and so on. Much like it works e.g on Google and search requests routing across thousands of nodes. An elaborate fan-out that is. Also not enabled because it doesn’t help us in any way now, but it worked in limited tests.


Our physical nodes are pretty weak all things considered (2 or 4GB of RAM, 1 CPU) so we try to make a good use of available memory and resources; Currently our CloudDS nodes RES footprint is ~1GB on all active nodes — we can tune it to use more, or less, memory though. Using the right data structures helps with keeping the memory footprint low.

top output on a CloudDS node. ‘cloud’ process is CloudDS process

Every file inherits from DatastoreProvider; every DatastoreProvider object is registered with a DatastoreController (there is currently one such object). Reads to all files go through a special proxy interface and that results in maintaining pages (datablocks) to all files; page size is configurable on a per ColumnFamily basis(can be set to bypass it completely) and so we get over 90% datablocks hit ratio on disk I/O in practice. No need for dedicated Row or keys cache; through we do maintain a digests cache.

class DatastoreProvider
{
Vector<uint64_t> offsets;
....
int IndexByOffset(const uint64_t offset) const
{
const int indexLast = offsets.Size() — 1;
int top = indexLast, btm = 0;
const uint64_t *const values = offset.Values()
     while (btm <= top)
{
const int mid = (btm + top) / 2;
const uint64_t o = values[mid];

if (offset >= o && (mid == indexLast || offset < values[mid + 1])
return mid;
else if (offset < o)
top = mid — 1;
else
btm = mid+1;
}
return -1;
}

struct datablock *AccessByOffset(const uint64_t offset)
{
const int index = IndexByOffset(offset);
     return index != -1 ? AccessByIndex(index) : NULL;
}
..
}

Each ColumnFamily can be configured to use the in-memory storage engine ( does not hit the disk; stores data on MemTables only) or the default storage engine ( will use commit logs, memtables and sstables). Furthermore, it supports two replication strategies (network aware and ‘trivial’) with replication factor options on a per Keyspace basis. It also supports all consistency levels supported by Cassandra.

All that functionality makes it possible to build highly-available Memory caches(Memcahe) with support for wide-columns, with tunable replication factors (cross DC is supported — upto 250 distinct DCs, 65K nodes). So there is no need to maintain another infrastructure component. If you need a memory cache service, CloudDS works great — you can add nodes to increase capacity etc. The default storage engine is on-disk storage.


A RowMutation can be associated with multiple keys. This is useful for e.g a Twitter based tweets distribution system(fan-out), where when a popular user (say, one with 1million followers) tweets and we want to append the tweet id to 1million queues of all followers. Instead of creating 1million row mutation objects that are identical except the key, a single row mutation object with 1million keys can be specified instead. This results in reduced memory use(no need to create and instantiate so many RMs) and faster processor on CloudDS.

It’s also very useful for deleting the same values across many keys; dropping one or more columns from a CF, or deleting rows altogether (want to delete all email messages in a folder? set flag of many messages to ‘read’ , etc). This is akin to SIMD instruction semantics. One operation applied to multiple objects.

Furthermore, a ColumnFamily can be configured to retain upto whatever many first or last (based on ordering) columns, and you can set a max TTL — so that during compaction and prior to returning a value to the client, expired columns are stripped.

Pseudo-code: update followers’s stream with a just published Tweet ID

Riak CS operates alongside Riak to implement an Amazon S3 compatible files store. CloudDS supports much of the same functionality, except it’s built into CloudDS; no need for another service. It chops files into 1MB chunks and uses the KV storage for the chunks, and the wide-row storage for the index object. This way we can support storage of very large files(TB) and random access to them; thanks to the KV store access is very fast and thanks to the wide-column engine we could, if we wanted, support append operations or even random-access writes(currently, we don’t need that functionality). So, CloudDS is used for objects-storage(key=>rich value), in-memory cache and files storage.

Speaking of Riak, Basho is comprised of some very smart people and their products are really excellent. If you are looking for datastore to power your services, I ‘d highly recommend you check them out, or Cassandra/Datastax.

If performance is paramount to you, check out Aerospike or MemSQL. FoundationDB is a very interesting new alternative; it does things differently (ACID transactions, etc).


All main environment state objects require no serialization (no locks, etc) to access, because they are all immutable. For example, the object that represents the cluster ring is never locked — partially because all those objects are very frequently accessed. Instead, whenever we need to change either (e.g list of active SStables , or MemTables or ImmutableFiles, or endpoints map etc etc) we create a copy, update it and then use an atomic operation to exchange the pointers (RCU). There is an global autorelease pool (similar to Objective C autorelease pools) which drains objects a few seconds after they are pushed in it. This facilitates safe RCU operations.

Example:

Typical read-copy-update process

It is often much cheaper and more efficient to move the computation to data, not the other way around, mostly based on the amount of data involved and the operations on them. (This is particularly important in MapReduce jobs distribution — the closer the computation runs to the node that provides the data the better).

We will support running code directly on the nodes. That is, Javascript code (we wrote our own JS compiler/runtime we call SGL) that has access, through an exposed proxy object, to the CDS API. So you can e.g execute code that computes a single value from many different values that can potentitally be very large and returns that single value, instead of returning a lot of data only to the client just so we can generate a single value from that.

There are some fairly interesting ideas we are toying with, like being able to build data layers on top of CDS building blocks, smilar to the way FoundationDB’s layers work, except run on the actual CDS nodes, not on the client. This, of course, is not particulalrly novel, Google and others have been doing that for a long while now; I thought it was worth mentioning though anyway ( this is not yet implemented, only in prototype phase, but should work fine ).

SGL(Javasccript) code executed on CloudDS

I will keep updating this post with more design and implementation details that worked for us.