Create a riak_core application in Elixir (Part 5)

It’s been a long time sine my last post. I’m sorry about it but a lot of interesting things are happen, stay tuned on next posts and probably you’ll find some interesting things. Ok, quick recap.


Where we are now ..

We have an application that can do a lot of things:

  • We can add and remove data in a Key-Value way
  • We can get all the keys and all the values stored in our system
  • We can add and remove nodes in a very simple way without lose data

But If you remember my last post we are not able to manage a crash. This is a very important feature if we want to be HA. We are creating a distributed system that would be able to survive to a crash of some nodes, we will see later how many failures nodes we can manage. Technically speaking we are creating an AP system in the terms of CAP theorem. We are trying to be available when we lose a node, and we will lose a node at the end.

If you remember the architecture of the system, we have some nodes that contains a lot of vnodes.

Cluster with 4 physical nodes and 32 vnodes.

In the previous image we have 4 physical nodes that contains 32 vnodes. So every physical node contains 32 / 4 = 8 vnodes. If we lose a physical node we lose 8 vnodes and its data, ouch.

Given that we can’t work against the fate a possible solution is to duplicate our data when we write and read from multiple source when we want our data back.

Redundancy

We could send a “write” command to some vnodes wait an ack from these nodes and return an “ok” to the client.

Similarly, when we want to perform a read we send a “read” request to some vnodes and wait a response from these, when we have collected all this data we can return it to the client.

As you can probably suppose we have some questions to deliver:

  • To whom send the read or write request?
  • How many responses do we want to wait before send ok to client?
  • How many copies do we want try to read or write for every request?

But before to continue we have another questions …

How many nodes for our clusters?

This is a good question. In the previous example we used 3 nodes. There is a interesting article that suggest to use at least 5 nodes. To make a long story short, you should have at least 3 replica for you data. If you have 3 nodes and one crash you can’t more have 3 replica. With only 4 nodes it’s not guaranteed that the three replica are on different physical nodes. So use 5 nodes. In our examples now we will use 5 nodes and also in the repo I added config for node4 and node5.

How many replicas and how many responses

Until now we have used this function:

:riak_core_apl.get_primary_apl(idx, 1, NoSlides.Service)

Where idx is the index of the document created and 1 is how many entries we want in our preference list. Now we can request 3 items and, in my environment, I get this result:

iex(gpad_3@127.0.0.1)7> :riak_core_apl.get_primary_apl(idx, 3, NoSlides.Service)
[{{91343852333181432387730302044767688728495783936, :"gpad_2@127.0.0.1"}, :primary},
{{182687704666362864775460604089535377456991567872, :"gpad_3@127.0.0.1"}, :primary},
{{274031556999544297163190906134303066185487351808, :"gpad_5@127.0.0.1"}, :primary}]

Ok very well! But, for example, if I kill the node gpad_2 then I get this result:

iex(gpad_3@127.0.0.1)8> :riak_core_apl.get_primary_apl(idx, 3, NoSlides.Service)
[{{182687704666362864775460604089535377456991567872, :"gpad_3@127.0.0.1"}, :primary},

{{274031556999544297163190906134303066185487351808, :"gpad_5@127.0.0.1"}, :primary}]

So I request 3 items but I get 2. We can try to use the function:

:riak_core_apl.get_apl(idx, 3, NoSlides.Service)

This function with all node up and running return a similar result to get_primary_apl:

iex(gpad_3@127.0.0.1)9> :riak_core_apl.get_apl(idx, 3, NoSlides.Service)        
[{91343852333181432387730302044767688728495783936, :"gpad_2@127.0.0.1"},
{182687704666362864775460604089535377456991567872, :"gpad_3@127.0.0.1"},
{274031556999544297163190906134303066185487351808, :"gpad_5@127.0.0.1"}]
iex(gpad_3@127.0.0.1)10>

But if we kill the node gpad_2 we get this result:

iex(gpad_3@127.0.0.1)10> :riak_core_apl.get_apl(idx, 3, NoSlides.Service)
[{182687704666362864775460604089535377456991567872, :"gpad_3@127.0.0.1"},
{274031556999544297163190906134303066185487351808, :"gpad_5@127.0.0.1"},
{91343852333181432387730302044767688728495783936, :"gpad_1@127.0.0.1"}]

As you can see the function return always 3 entries, the first two are “primary” entries the last is filled with another entry. In this way we can always send the request of write/read to 3 vnodes and then wait the response. But wait, how many nodes should respond to declare the command completed?

It depends!

If we wait a response from all replicas and one node has some problems, the client could receive error too.

If we wait a response only from one replica and the other replicas doesn’t receive data for some reason (network problem?!?), we get only one replica with our data and we can lost it if this node crash.

So a good balance is to send the request to three replicas and wait a response from two of them. Now let see some code …

Thanks to Margaret Hamilton.

Send a request and wait a response

As we already wrote we need to send a command to a group (3) of vnodes and wait the response from a subset of them (2). To do this you need:

  • A supervisor with a simple_one_for_one strategy
  • A process that implement gen_fsm to manage the request/response cycle
  • A new function in NoSlides.Service module to manage this request/response path.

Writing in a redundancy way

Take for example the function to write a new key and value pair NoSlides.Service.ft_put/2. This function is very simple, it execute the function NoSlides.WriteFsm.write/2 and get back a req_id, an unique id that identifies the request. This last function spawn a new process implemented in module NoSlides.WriteFSM and supervised by module NoSlides.WriteFsmSupervisor. After that the NoSlides.Service.ft_put/2 waits the real response from the FSM. This is the code:

As you can see a similar pattern is followed for the get function.

Send requests and wait responses

A lot of work is done in NoSlide.WriteFsm module. This module expose a function called write/2 that accept key and value and spawn a new gen_fsm process and dynamically attach it do WriteFsmSupervisor.

The process receive these arguments:

  • req_id an unique identifier for the request
  • self() is the pid of the request that start to wait response
  • k, v the key and the value to store

The implementation of this fsm is where the magic happen. The gen_fsm is similar to a gen_server and respect the pattern start_link/init but in the init you should return the first state of the FSM. This is our code:

The first step to be executed is prepare where we get the preference list

as you can see we request 3 items in preference list. After the prepare we go in execute state.

The execute step execute a function against the vnode, very similar to what we have done until now in the last posts. Now we execute the command on the 3 items stored in state.pref_list, we send the req_idto the vnode because we need to match our request, and we also add the sender of the request {:fsm, :undefined, self()} because we want receive the response from the vnode to consolidate it in the next step. We also set a @timeout of 30 seconds.

As we have seen before we wait two responses (@n_writes 2) from the vnodes otherwise the @timeout expires. If we receive enough responses we send to the requester stored in state.from the :ok and exit normally.

Inside the vnode

The implementation inside the vnode is very similar to the other command in vnode:

the only difference is the response that is not only :ok but {:ok, :req_id}.

And the read ???

As you have noticed we wrote a new KV on our system and it has been replicated on three or at least two nodes. We can get a very similar behaviour with the read. We get the values from three or at least two nodes and return it. But when you start to replicate the data often this data can’t be in perfect sync.

The conflict resolution is a wide field and there are a lot of strategies that you can apply. The more widely strategies adopted are vector clock (or better vector version) and CRDT. In our example we simple return all different values that we will find. So the consolidate of the NoSlides.GetFsm module is implemented in this way:

Run it

Now it’s time to run some code. We can try to execute some reads and writes. As I told before I suggest to run five nodes. In the repo now there are a config also for node gpad_4 and gpad_5. Now you can run all the nodes in different consoles in this way:

$ MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run
$ MIX_ENV=gpad_2 iex --name gpad_2@127.0.0.1 -S mix run
$ MIX_ENV=gpad_3 iex --name gpad_3@127.0.0.1 -S mix run
$ MIX_ENV=gpad_4 iex --name gpad_4@127.0.0.1 -S mix run
$ MIX_ENV=gpad_5 iex --name gpad_5@127.0.0.1 -S mix run

now you can join all the nodes executing this command on all nodes from two to five:

iex(gpad_2@127.0.0.1)1> :riak_core.join(:"gpad_1@127.0.0.1")

After a while the cluster became stable and you can get the state of the cluster in this way:

iex(gpad_3@127.0.0.1)6> NoSlides.Service.ring_status
==================================== Nodes ====================================
Node a: 4 ( 25.0%) gpad_1@127.0.0.1
Node b: 3 ( 18.8%) gpad_2@127.0.0.1
Node c: 3 ( 18.8%) gpad_3@127.0.0.1
Node d: 3 ( 18.8%) gpad_4@127.0.0.1
Node e: 3 ( 18.8%) gpad_5@127.0.0.1
==================================== Ring =====================================
abcd|eabc|deab|cdea|
:ok

Probably your valued could be slightly different from that but you should have 5 nodes in the cluster.

Now you can write a value replicated on more nodes, to do that try to execute this command:

iex(gpad_1@127.0.0.1)8> NoSlides.Service.ft_put(:gpad, 1)

If you look the log you should see that this value is written on three nodes, in my environment it’s written on node 2, 3 and 4. Now you can try to read the value in this way:

iex(gpad_1@127.0.0.1)8> NoSlides.Service.ft_get(:gpad)
17:48:09.370 [debug] Start GetFSM
{:ok, [1]}

You should get {:ok, [1]} because all the copies are in sync.

Now we try to kill one of our nodes, for example the 2. Now without the gpad_2 node, if you try to execute the ft_get repeatedly you should get a response like this:

iex(gpad_1@127.0.0.1)8> NoSlides.Service.ft_get(:gpad)
{:ok, [nil, 1]}

Why? Because you sent 3 requests of read to 3 nodes and return response to client when you get 2 of them. Sometimes one of the these two responses came from a node without value, in my environment the node 5.

Continue in our experiment. Now we write the value 2, in this way:

iex(gpad_1@127.0.0.1)8> NoSlides.Service.ft_put(:gpad, 2)

It is written on node 3, 4, and 5. Now we restart node 2 and we see that a transfer of data from node 5 and node 2 happen so all node now are synchronized and we get back this value: [2]. As you can imagine this operation can take time and in some situation one of the 3 node can lose the request and remain with the old value. As writeen before there are some strategy to manage this situation also when simultaneous writes occur.

Recap

Now our system is able, in a very simple way, to manage a crash from a node, without lose data. As I said before we choose to be AP instead of CP. So we choose to loose some of consistency (we return [nil, 2]) but remain available also when we loose some nodes.

This is the last post of our journey on how to implement a riak_core application in Elixir. It was a pleasure for me to write it but also a big effort. I hope that you can find this posts useful and interesting.

I’m enthusiast about distributed system and in the next posts I’ll try to give you some interesting info about this wonderful field of IT.

Please leave a comment about this post or if you are interested in some particular topic I’ll be happy to consider it for the next posts.

Like what you read? Give GPad a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.