Create a riak_core application in Elixir (Part 2)

First of all, thanks to everyone who read my previous post. In this second post we continue to use the code that we started to write in the previous post and we will add some features. You can find a final version here.

The first feature to add is the classic ping.

If you played the original then you are old as I am :-)

Before we start, a quick recap, we have a simple “empty” application that can be compiled and executed in three different environment using this command:

# this is node 1
MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run
# this is node 2
MIX_ENV=gpad_2 iex --name gpad_2@127.0.0.1 -S mix run
# this is node 3
MIX_ENV=gpad_3 iex --name gpad_3@127.0.0.1 -S mix run

What we want to do now is to add a simple ping functionality that can be used from the application console and that can show how a cluster works.

Nodes, virtual nodes and one ring to rule them all

One of the main concepts of riak_core is the concept of virtual nodes that is related to what is called the ring.

The idea is simple. You should take all the possible values that you can manage in your application (this is called the keyspace) and partition it. For example, riak_core uses a classic hash function to spot a value from a binary. The code is similar to this:

iex(1)> key = "gpad"
"gpad"
iex(2)> idx = :erlang.term_to_binary(key) |> :crypto.sha
<<211, 108, 199, 240, 242, 57, 27, 91, 139, 82, 154, 145, 27, 215, 191, 24, 107, 77, 162, 202>>
iex(3)> << v::unsigned-big-integer-size(160) >> = idx
<<211, 108, 199, 240, 242, 57, 27, 91, 139, 82, 154, 145, 27, 215, 191, 24, 107, 77, 162, 202>>
iex(4)> v
1207022950459909619005619617169028319269647917770
iex(5)>

As you can see, we transform the term that we want to use as key to a binary and then we hash the value with :crypt.sha.After that we use the binary pattern matching to show the real value that is between zero and 2¹⁶⁰.

By default, riak_core divide the keyspace (2¹⁶⁰) in 64 partitions, every single partition manages (claims) a segment of the ring. Every single partition is called virtual node (vnode) and it is distributed on a real node, in this way we can minimize the data transfer when we add or remove nodes (we will see in more details in the next posts).

We are in the BEAM world so it is natural to map every vnode to an Erlang process. Riak_core is so nice to provide a behaviour to help us in the implementation of a vnode process. Adopting a behaviour in Elixir is very simple. I suggest to create a module called NoSlides.VNode in lib/no_slides and declare the riak_core_vnode behaviour like this:

defmodule NoSlides.VNode do
@behaviour :riak_core_vnode
end

If you try to compile the project you should get some warnings that say warning: undefined behaviour function …. This is because we don’t have implemented the behaviour yet. If you come from a OOP mindset then you can think of a behaviour as an interface, a list of functions that a module should implement to be executed in a determined context. This is the list of warnings that we have:

warning: undefined behaviour function delete/1 
warning: undefined behaviour function encode_handoff_item/2
warning: undefined behaviour function handle_command/3
warning: undefined behaviour function handle_coverage/4
warning: undefined behaviour function handle_exit/3
warning: undefined behaviour function handle_handoff_command/3 warning: undefined behaviour function handle_handoff_data/2
warning: undefined behaviour function handoff_cancelled/1
warning: undefined behaviour function handoff_finished/2
warning: undefined behaviour function handoff_starting/2
warning: undefined behaviour function init/1
warning: undefined behaviour function is_empty/1
warning: undefined behaviour function terminate/2

These functions should be implemented by your module and are called when certain event are triggered:

  • init/1— It’s the function called when the virtual node is created. It’s very similar to the init function of GenServer. It receives an array of one element, the value of the partition that the vnode takes care of. What is returned from this function is the state of the process. The state is always passed as a last argument in every other callbacks.
  • terminate/2 — It’s called when the vnode is terminated, it receives the reason (an atom) and the state.
  • handle_exit/3 — It’s called when a process linked with the vnode dies. It receives the pid of the dead process, the reason and the state. You should return {:noreply, state} to continue.
  • delete/1 — It’s called when it’s necessary to remove the data related to this vnode. As usual it receives the state and should return {:ok, new_state}.
  • handle_command/3 — It’s called when a command is executed against this vnode. It’s the function that we use to implement our command.
  • handle_coverage/4 — It’s called when we want to create a coverage command. We treat this type of command in the next posts.
  • The remaining functions are all related to handoffs procedure. We’ll discuss handoffs in the next posts.

There is a last function that we should implement if we want the system to work properly, the start_vnode/1 function. So you could implement your module like this:

The function handle_command/3 is what contains the real work, it works with pattern matching (similar to how a GenServer works) so, we can implement handle_command in this way:

def handle_command({:ping, v}, _sender, state) do
{:reply, {:pong, v + 1}, state}
end

Now that we have the ping command implemented, how can we execute it from the console? We need to introduce a new concept that I like to call it service. A service is a module that wraps how to interact with the command exposed by riak_core. This service should be registered in riak_core so riak_core knowns what node exposes that service. Check out the code:

At line 4 we calculate the id of the value that we want to store, and the id is a value in the keyspace. Using this idx, we can ask riak_core for a preference list with method get_primary_apl (line 5). The preference list is a collection of which nodes take care of which partition. When we call get_primary_apl we ask for a list length one (second parameter) and a node that implement the NoSlides.Service (third parameter). In this example we require one element because we want execute a command on only one node, in the next posts we will discuss about redundancy. From the preference list we extract the index_node that we use to identify the real and virtual node on which to execute the command. This node has the ownership of the data identified by idx.

At line 9 we use index_node to execute the call using the function :riak_core_vnode_master.sync_command. This function is synchronous and it doesn’t return until the work in vnode module is completed. If you check the :riak_core_vnode_master code then you will also find the function :riak_core_vnode_master.command that executes the work in an asynchronous way.

You can also check out the sync_spawn_command that work like sync_command, if you check the code you read this comment:

%% Send a synchronous spawned command to an individual Index/Node 
%% combination.
%% Will not return until the vnode has returned, but the
%%% vnode_master will
%% continue to handle requests.
sync_spawn_command({Index,Node}, Msg, VMaster) ->

This is not true, it’s probably an old comment and also an old implementation. Last but not least, the vnode name is postfixed with _master(NoSlides.VNode_master) which is a riak_core convention.

Now that we have implemented the Service and the VNode we need to put everything together. To do this we need to start from the beginning …

Starting a riak_core application

We are in a OTP application so we need to start from a module that implements the Application behaviour. If you create a empty project with mix then you probably have a module called NoSlides that use Application, throw away it and replace it with something like this:

lib/no_slides.ex

At line 6 we start a supervisor that we’ll implement later, if everything starts smoothly then at line 8 we register the module that implements the vnode and, at line 9, we register the module that implement the service.

The supervisor should be placed in file lib/no_slides/supervisor.ex and should be something like this:

lib/no_slides/supervisor.ex

This is a classic supervisor but we need to pay attention to some details, the supervisor id should end with _sup (line 6) and the worker id should be postfixed with _master_worker(line 11).

After that, you can start a node with this command:

MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run

inside elixir console, you can execute the service ping:

Interactive Elixir (1.3.4) - press Ctrl+C to exit (type h() ENTER for help)
iex(gpad_1@127.0.0.1)1> NoSlides.Service.ping
{:pong, 2}
iex(gpad_1@127.0.0.1)2> NoSlides.Service.ping(42)
{:pong, 43}
iex(gpad_1@127.0.0.1)3>

Now we can join more nodes and execute a distributed ping, as first step we need to start more nodes on different consoles:

# this is node 1
MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run
# this is node 2
MIX_ENV=gpad_2 iex --name gpad_2@127.0.0.1 -S mix run
# this is node 3
MIX_ENV=gpad_3 iex --name gpad_3@127.0.0.1 -S mix run

Now you can join all the nodes together, from console of node two you type:

iex(gpad_2@127.0.0.1)1> :riak_core.join('gpad_1@127.0.0.1')

Do the same on console of node 3:

iex(gpad_3@127.0.0.1)1> :riak_core.join('gpad_1@127.0.0.1')

If you check the console of node 1 then you should see some logs like this:

12:21:53.168 [info] 'gpad_2@127.0.0.1' joined cluster with status 'valid'
12:22:39.155 [info] 'gpad_3@127.0.0.1' joined cluster with status 'joining'
12:22:39.191 [info] 'gpad_3@127.0.0.1' changed from 'joining' to 'valid'

Now you can ask to riak_core to print the status of the ring with this command:

{:ok, ring} = :riak_core_ring_manager.get_my_ring
:riak_core_ring.pretty_print(ring, [:legend])

You should see something like this:

============================= Nodes =============================
Node a: 22 ( 34.4%) gpad_1@127.0.0.1
Node b: 21 ( 32.8%) gpad_2@127.0.0.1
Node c: 21 ( 32.8%) gpad_3@127.0.0.1
============================= Ring =============================
abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abca|abba|abba|abba|abba|abba|

As you can see, your ring is divided in 64 partitions and the first node receive 22 VNodes, the other two nodes 21 VNodes, you can check it also with observer:

Now we can add a log in the VNode implementation so we can see which node respond to ping call (remember to add require Logger at the beginning of the module)

def handle_command({:ping, v}, _sender, state) do
Logger.debug("Receive ping with value: #{v}")
{:reply, {:pong, v + 1}, state}
end

from console 1 execute this command:

iex(gpad_1@127.0.0.1)1> NoSlides.Service.ping
{:pong, 2}

On node 2 you can see a log like this:

12:43:00.822 [debug] Receive ping with value: 1

We have a very simple distributed ping. If you change the value passed to ping, you should see a different node that respond to ping. For example, if you use the value 42 then the node 3 should respond.

Now that we have a ping we can easily create a in-memory key-value store.

Cook your memory KV

Now that we have understood how to connect a service and a vnode, we can easily create a KV memory store exposing a get and put function on service module:

Add the get and put implementation also on VNode:

Now you can put and get some values from different nodes.

Stop and restart every node, It’s not necessary rejoin all the nodes. From console of node 1 execute this command:

iex(gpad_1@127.0.0.1)1> NoSlides.Service.put(:k, 42)
:ok
iex(gpad_1@127.0.0.1)2> NoSlides.Service.get(:k)
42

Check logs on node 2:

iex(gpad_2@127.0.0.1)1> 
19:31:30.634 [debug] [put]: k: :k v: 42
19:31:39.242 [debug] [get]: k: :k

You can also retrieve this value from node 3:

iex(gpad_3@127.0.0.1)1> NoSlides.Service.get(:k)
42

We have a simple KV store in memory, you can put a lot of different types as value:

iex(gpad_1@127.0.0.1)9> NoSlides.Service.put("gpad", %{ blogs: ["riak_core I", "riak_core II"] })
:ok
iex(gpad_1@127.0.0.1)10> NoSlides.Service.get("gpad")
%{blogs: ["riak_core I", "riak_core II"]}

And also as key:

iex(gpad_1@127.0.0.1)11> NoSlides.Service.put(%{a: 1, b: 2}, "gpad")
:ok
iex(gpad_1@127.0.0.1)12> NoSlides.Service.get(%{a: 1, b: 2})
"gpad"

This is the beginning of a simple memory key-value store. There are some open questions:

  • What happens if a node leaves the cluster with :riak_core.leave ?
  • What happens if a node crashes?
  • How can we get the list of all keys?

I’ll try to answer these questions in the following posts. If you have any questions or spot some errors, don’t hesitate to leave a comment.