Field notes on modelling relational data with CRDTs (Gies-A-Job Remix)
Introduction
Basho has gone under, so it seems appropriate to share some field notes from some internal experiments on using a subset of CRDTs to model relational data in order to develop an eventually consististent relational(ish) version of Riak.
The prototype I am discussion here had the internal name Afrika which is an anagramonym for Another Fucking Riak. Its logo was the black star of Africa in honour of Kwame Nkrumah (not a lot of people at Basho knew that, but hey IDGAF…)
Purpose
The purpose of this blog post is two-fold:
- to recycle our thoughts to the wider academic and practitioner field in order to stimulate new research and inform other practioners of valuable options to explore
- help me get a new job (gies a job! — Erlang/BEAM, distributed systems, remote)
Scope
The scope of this blog post is:
- statement of customer problem we were trying to solve
- background that informed the solution
- the working prototype (and its discontents)
In part 2 I talk of:
- relationships to other work
- fruitful areas for research
Things pertaining to the collapse of Basho as a company will only be referred to as and when they illuminate the state of play in these field notes.
Customer Problem
The customer problem was the third disk in a Tower of Hanoi of problems.
The first customer problem, to which Riak is the solution, is “I want a database that is highly available and which will always accept writes and respond to reads with an answer”.
The problem on top of that is addressed by CRDTs: “we had a partition during write and Riak cannot resolve the customer intentions and has returned 2 (or more) values for me to choose between on a read”.
This customer problem is the problem of using CRDTs:
Halp! mucho confused, lolsob ☠️👻👹🔥⚡️💥🥀😜🤡😳🤕
Developers find CRDTs hard to understand and hard to reason about.
The genesis of this approach to solving this problem came from a casual remark by Russell Brown, king o’CRDTs: to wit “maps and sets are not really datatypes, but mechanisms for managing causality that ought to be migrated higher up into the database”.
Background Experience
Whilst at Hypernumbers I had designed an innovative data query language (with the help of Phil Wadler of Haskell fame who was on our advisory board).
It is a bit of a side story which you can read here but is important because it meant that I understood exactly how little of the SQL paradigm you need to be able to build an online bank — that confidence in making unlikely things fit the problem domain greatly encouraged me in my experiments at Basho.
How does Time Series fit in?
Well, little Johnny, lets you sit down with warm milk and a cookie and I will tell you tall tales of management at Basho.
Product management function at Basho was very weak (and I want to emphasis this was a management failing, not the fault of people on the product team — people rarely fail, processes often fail — but at the top when management fail to put in place proper processes, for hiring, training, working, then those people, the C-suites, have failed hard. At Basho the heid-bummers failed hard, and often.)
So the company jinked about seeking the magic product feature for Time Series (which an incoherent product process can never find) I, in my own quiet way, tried to hold the customer as we say in Scotland. I had spent 2 years working at bet365 as a consumer of Riak and tried to keep that view of the product alive.
This story is that: the strategic view of an integrated query approach for the new TS system and the existing KV (Key-Value) Riak product.
So lets talk about NoSQL/KV systems and their discontents.
NoSQL and its discontents
You can of course query a NoSQL system with SQL, its just that the only valid query statement is SELECT value from mytable WHERE key='mykey';
In this section I will assume you know how a Dynamo DB ring works, and what an n_val is. If you don’t here is a primer.
Riak KV the basics
Essentially there are two access paths to reading data from a NoSQL dynamo style DB. If you know the key you can to directly to the data and read it:
But if you don’t know the key you need to scan the whole ring (or a sub-set of the ring — if you have written the data with an n-val of 3 you need to read 1/3 of the nodes + 1 to be sure of getting all the data:
Normal KV databases are therefore a bit all-or-nothing — go to one bit of the ring or read it all.
TS and data colocation on the ring
At the core of TS was the idea of introducing data co-location — make the data a little bit lumpier (with slightly more variation in distribution of work around the ring).
It is the issue of data colocation that ties the Time Series query work with riak KV. TS colocates data by quantizing time, and KV colocates data by writing to CRDT sets.
At the core of the dynamo model is the deterministing placing of data on a fixed and agreed ‘ring’ of data using consistent hashing.
The user writes a key-value pair {key, value}
and the key is hashed to get a vnode and the data is dispatched to that vnode to be written (with additional values being written to the next vnode in the ring, and the one after that if, for instance, an n_val of 3 is used.
In normal KV the value of the key stuffed into the hashing function is the same as the key that is written to the disk. In TS we insist that the key that the customer supplies is a composite key (this isn’t strictly true, but…).
A typical TS key has the form {id, time}
giving a KV pair of {{id, time}, value}
. In TS before we hash this key to write to the DB we quantize it. Essentially for a given bucket we specify a time period (10 seconds, 15 minutes, two weeks, a year). If you image a device ticking and sending a reading every minute, in normal KV that {id, time}
value would hash to a new vnode every time. In TS it will hash to the same vnode for an entire time period — so a 1 minute quantum means it would write 60 values to one vnode.
We can now query up to 60 values of data with single request. If we now go to 2 places we can read 120 values, 3 gets us 180 etc, etc. Querying now stops being quite so all-or-nothing — we are smoothing out the query mechanism.
Data colocation on disk in TS
But access to the ring is only part of the story — the TS data is also written consecutively on disk which means a query can just scoop a big slurp of data up in one go:
The vnode is itself an ordered set of data. Within a vnode each bucket (whether KV or TS) is written contiguously, and within a TS buckets the quantums of data are written contiguously. We will see how this maps to CRDT sets later.
Queryable structured data
The third component we need to understand with relation to TS and queries is the structure of the value
in TS.
Previously we talked of writing with a composite key {{id, time}, value}
but in TS the value was also structured — depending on the bucket.
A bucket would be defined with a CREATE TABLE
statement:
CREATE TABLE GeoCheckin (
id SINT64 NOT NULL,
region VARCHAR NOT NULL,
state VARCHAR NOT NULL,
time TIMESTAMP NOT NULL,
weather VARCHAR NOT NULL,
temperature DOUBLE,
PRIMARY KEY (
(id, QUANTUM(time, 15, 'm')),
id, time )
)
Let’s unpack this:
- the element that is being written is a tuple with this structure
{id, region, state, time, weather, temperature}
for instance{12345, "us-east", "virginia", 2017-9-05 16:04:23, "hot", 23.4}
- the first thing that happens is that the tuple being passed in is checked against the structure, the first element is of type
sint64
the second is avarchar
etc, etc - the partition key — the one that is hashed to get where in the ring to write the data to is
{12345, 2017-9-05 16:00:00}
(notice how it has been ‘rolled back’ or quantized to 4pm) - the local key — the key used to write the data to disk is
{12345, 2017-9-05 16:04:23}
(this has not been rolled back/quantized) - the value is
{12345, "us-east", "virginia", 2017-9-05 16:04:23, "hot", 23.4}
So these three elements make TS queryable more quickly:
- colocation of data on the ring reduces where you have to go to get data
- colocation of data on disk reduces how much data you have to iterate over
- known and assertable structure of the data ensures that you can query it deterministically — only running
sum
against data of typesint64
ordouble
but notboolean
orvarchar
etc, etc. You don’t have the hell of casting and spoofing default values that you get with entirely unstructured data. You can of course hide unstructured data invarchar
orblob
fields for semi-structured data
Queries and CRDTs
(I am going to proceed on the basis that you understand what CRDTs are, how they work and what their limitations are. If you don’t this is a basic introduction and this is Russell Brown’s Erlang User Conference presentation.)
When we were building TS I was always trying to figure out how to extend the SQL query function to KV. So I was pondering how to mate the camel to the goat: an SQL query parsing system designed for TS to conflict-free replicated data types. It turned out to be super-easy once we realised that not all CRDTs are equivalent — in particular that CRDT sets and maps are structural CRDTs and not ‘data types’ in their own right.
Using the lessons of query language design from Hypernumbers I started trying to map the various concepts onto each other to see if they fitted.
Maps are schemas
Well maps can be used to implement schemas. A CRDT map is a collection CRDT that can contain other CRDTs: including CRDT sets, CRDT counters, CRDT flags and CRDT registers.
We have seen in TS that we can map a CREATE TABLE
statement to an Erlang tuple: {id, region, state, time, weather, temperature}
. With TS we simply write that raw tuple to disk without any causal information.
But if we use a CRDT map to store that information we can create a data structure with embedded causal information (vector clocks) which we can represent as {🕖, id, region, state, time, weather, temperature}
.
So generally CRDT maps are currently used in an unstructured fashion, values of type any
are place in arbitrary slots in the map. With a schema in front of it a CRDT maps becomes structured data
Sets colocate data on the ring (and on disk)
Some users of CRDT sets on Riak stuff a hundred thousands things into them. Each CRDT set is written to disk as a single object — that’s your colocation right there — but that’s also one of the problems.
It would be good to be able to query a set: add up all the values in a set, calculate the average and so on and so forth — ideal work for SQL.
The implementation of CRDTs in riak is monolithic. Russell Brown was working on a ‘big sets’ implementation (with the same semantics) where each element was written to disk separately (as were the metadata and vector clocks).
In the monolithic implementation of CRDT sets not only did you have to read the entire set into memory to operate on it, and you had to replicate the entire set to the other primaries.
Sets of maps can represent projected joins
A CRDT set is another collection element — you can think of it as a an array with causal information. In an unstructured way it can contain a set of anything, but if we hide it behind a schema we can insist that it only contains CRDT maps, each with a key, and those maps all have type information imposed on them because they in turn implement a schema.
By inserting this set of maps into another map we can represent a projected table join. That is a bold assertion — but lets see what I mean by that.
Consider three tables connected by foreign keys:
We can project out the joins, here the projections are colour-coded:
And then pull out the projected joins. We represent the first table as a map, one of whose elements is a set, which contains the relevant entries from the second table implemented as maps, and each of those maps contain a set that contains the relevant entries of the third table as maps, all wrapped up in a vector clock with the causal information:
In practical terms it looks like this:
{Beta, {🕢, Beta, Russell, Brown, TransactionSet}
Where TransactionSet
contains 2 objects:
{🕗, {Beta, 2}, 1/9/15, Deposit, £350, ActivitySet1}
{🕙, {Beta, 3}, 1/9/15, Deposit, £200, ActivitySet2}
And ActivitySet1
and ActivitySet2
are:
{🕘, {Beta, 2, 0002}, 1/9/15, Confirm, JSmith}
{🕞, {Beta, 2, 0006}, 3/3/15, Authorise, JScobie}
and
{🕗, {Beta, 3, 0004}, 2/9/15, Confirm, DScobie}
{🕒, {Beta, 3, 0005}, 2/9/15, Authorise, KHart}
There is a prototype that lets you create two joined tables and insert data into either of them and query against them — the ‘famous’ Afrika. It is a very primitive in-memory database that uses the state of a gen server for storage.
It uses the monolithic CRDTs of riak to implement it. This model is not-performant enough for production use — getting there required Russell Brown’s Big Sets and Big Maps.
As we saw early, sets and maps in Riak were implemented as single objects — written to, and read from disk in a single operation and passed around the ring to replicas for reconciliation.
By contrast in Big Sets (and Big Maps had the work continued) changes to a set were made as deltas. So add a single element to set or map and then send a delta of the change to the replicas:
One of the benefits of doing joins this way is that the operations on Big Sets/Big Maps are transactional — so multi-table updates can be bundled into a single atomic transaction — for a subset of SQL. This is an important abstraction for application developers and allows them to decompose the objects they work with in their applications and reason about the actions that result from write and updates of the data.
Enumerated data types in the schema are an important mechanism as well. We have seen from the work on spreadsheets how vital they are to flesh out data models.
We have seen that projected joins require a left hand key to be persisted to a ring. In traditional data models a given table may be on the many end of more than one one-to-many joins — making it not suitable for eventually consistent CRDTs.
The number of these tables can be pruned by implementing the simplest data enforcement rules by enumerated data types. Consider the following table definition:
CREATE TABLE GeoCheckin (
id SINT64 NOT NULL,
region VARCHAR NOT NULL,
state VARCHAR enum("AL", "AK", "AR", "AZ", ...)
time TIMESTAMP NOT NULL,
weather VARCHAR NOT NULL,
temperature DOUBLE,
PRIMARY KEY (
(id, QUANTUM(time, 15, 'm')),
id, time )
)
Traditionally the state
field would be constrained by being a foreign key to a separate table that contains a set of suitable states. Enumerated data sets allow that foreign key to be compressed into the table definition.
A lot of the delay in getting to eventually consistent SQL joins was the delay in getting Big Sets into production — a task somewhat hampered by the management not putting any resource into — despite actual paying customers wanting actual performance improvements in actually implemented technologies.
Now that the ship has gone done, hopefully the Big Sets work can escape into the wild.
One of the advantages of the monolithic CRDT object libraries is that they are easy to utilise in other systems — either add them as an Erlang dependency or do a straight port into other languages.
However the evidence from Afrika is that if you wish to get sophisticated and performant user friendly ways of manipulating them you will need to bake causality into your implementation — Big Sets (and Big Maps) were to become the very fabric of next-gen Riak.
Final Words
I leave you with some images I commissioned on Fiverr — Professor Lamport and his causal front-ear.
I commend this work to you, and gies a job!
Tweet me @gordonguthrie.