Create a riak_core application in Elixir (Part 4)

In the last post I ended with this simple question: “How can we get the list of all the keys?”. In this post I’m going to answer this question.

This is a ring drawn by one of my favourite musicians.

Coverage Command

In riak_core slang a command that involves all the nodes of the cluster is called “coverage command”. A coverage command is handled by a specific function of riak_core_vnode behaviour. To implement a vnode we need to implement this behaviour, and in particular this function:

As you can see, from a prospective of vnode, a coverage command is very simple. The tricky part comes when you want to query every vnodes of the cluster and collect all their responses.

Starting from Service

As we had seen in previous posts the service module is the module used by the client to interact with our cluster, so it’s a good place to start to implement our coverage command. In this module we add a new public function for every feature that we want to implement. In this post we are going to implement two functions:

  • the first one is keys and it returns the keys stored in the KV
  • the second one is values and it returns the values stored in in the KV

These functions retrieve all the data from the various vnodes using a new process that implements a behaviour defined by riak_core called riak_core_coverage_fsm. To obtain the final result from this new process we wait a message that will arrive when all the data will be gathered. To do that we call the function start_fsm/1 that returns an unique id. We can use this req_id with the receive statement to identify our response through a classic pattern matching.

Before going in detail about how to implement a coverage command, we need to agree upon what we mean when we talk about all nodes. Do we mean all the running nodes or all the nodes defined in the cluster? What should happen when we try to execute a command on all the defined nodes of the clusters but one of them is disconnected? This is something that you need to decide at application level. In our example we implement the coverage commands for all the running nodes. If you need to go more into details, please leave a comment and I’ll try to address your requests in the next posts.

Implementing riak_core_coverage_fsm

As you can think the riak_core_coverage_fsm behaviour is based on a classic Erlang gen_fsm. This behaviour take care of all the work necessary to interrogate and retrieve the responses from the vnodes. We need to set up some infos required by this behaviour to execute the work. To start I suggest to create a new module called NoSlides.CoverageFsm and to implement it in this way:

As you can see there are some functions to implement; the first one is:

start_link(req_id, from, what)

This is the function that is called by supervisor to start the gen_fsm. We simply use the :riak_core_coverage_fsm.start_link function to start the FSM passing the required arguments, the module that implement the behaviour, who did the call and the arguments that will be passed to the next functions.

init(from, args)

This is the first function called by riak_core, you need to return a tuple shaped in this way:

  • {what, req_id, from} are the arguments that are passed to handle_coverage_command of the vnode module and are managed by riak_core as an opaque structure
  • :allup is an atom that specifies that we need to execute the command on all the running nodes, not on all the nodes in the cluster
  • n_val and vnode_coverage are two arguments that specify how to manage the replication in our cluster. For now we set them to one.
  • NoSlides.Service and NoSlidesVNode_master are respectively the service used to identify the running nodes and the module that implements the vnode.
  • timeout is a time out
  • the last is the state of the finite state machine that will be passed to every next function.

As you can see we decide to execute the command against all the running nodes and we save in the state the pid of the caller and the received arguments.

process_result({partition, value}, state)

That function is called for every response received from the vnode. In this function we collect all the values that we receive in the res field of the state. It the end we will have, inside the res, an array of array composed by all the values (keys or values) that we’ve received. It the end of the process riak_core will call the function finish/2.

finish(:clean, state)

This function is called when everything goes fine and, in this implementation, we will send a message to the process that made the request using the pid saved inside the from field of the state.

If something goes wrong, for example a node goes down during a command execution, the command will fail and it will be called the function finish/2 with a first argument of type {:error, reason}.

finish({:error, reason}, state)

This function receives the reason for the error (time out for example) and it simply sends a message with {:error, reason} to the caller.

vnode implementation

As you can see in the gist below the implementation in the vnode is very simple.

Through the pattern matching it will be identified the correct function to execute and it will return the data requested getting it from the state.

Supervisor

To start all the processes the service module calls the function start_fsm/1 defined in the supervisor’s module NoSlides.CoverageFsmSupervisor and passing to it the atom :values or :keys to get the values or the keys of our cluster.

As you can see the supervisor is very simple. Just a note: the strategy that it implements is simple_one_for_one because we want to attach dynamically the children to the supervisor when the function start_fsm/1 is called.

Recap

In this post we have implemented two commands to get the list of the keys and the values saved in our KV store. Until now we have created a cluster that is able to do a lot of interesting things:

  • it can store and retrieve the data in KV way, distributing uniformly in the cluster (remember the consistent hashing algorithm).
  • It can execute a command on the entire cluster, as example, we have just implemented a command to get all the keys and the values stored on it.
  • It can be resized, grown or shrinked, through the addition and removal of nodes using the suitable commands.

Now we are ready to try to manage a node crash. In the next post we will discuss how to manage a crash, what problems will arise and how to manage them.

As always please leave a comment if you spot an error or if you find anything not clear. You can find in the usual repo the implementation discussed in this post.

As a side note, on April 7th 2017, I gave a speech at Rome Erlang Factory Lite about these topics. You can find my slides here.