Elixir Process Communication

Making Processes Work Together

billperegoy
im-becoming-functional
8 min readJul 25, 2017

--

Our Goals for this Post

In my last post on Elixir processes, we built a simple dispatcher process along with a server process. We created the simplest of connections. When we create a new server process, that server registers with the dispatcher. We can also query the server to get a list of the available servers. In this post we will begin to expand the capabilities of this basic compute farm. We will add these capabilities.

  1. Add a message type to the dispatcher to allow it to run a job on any available server.
  2. Change the server process to allow it to run a simple function
  3. Add the ability for the server to send a result status back to the dispatcher
  4. Update dispatcher to keep track of which server nodes are available and which are busy.

Allowing the Dispatcher to Address Servers

After the last post, servers can register with the dispatcher but the dispatcher does not have a handle to communicate back to the server. In order to correct this, we need the server to send its PID back to the dispatcher uponregistration.

def server(dispatcher_pid, name) do
send(dispatcher_pid, {:register, %{name: name, pid: self()}})
receive do
message -> IO.puts("Server #{name} received #{message}")
end
ComputeFarm.server(dispatcher_pid, name)
end

The only change here is in the second line above. The register tuple that we send to the dispatcher now includes the pid as well as the name. Note that the self() function is used to retrieve the PID of the currently running process.

We now need to change the dispatcher to use this new format. Previously, the dispatcher just stored a simple list of server names. We will change that to store a map keyed by server name. This will allow us to look up a server pid using the server name.

The changes here are again very straightforward.

def dispatcher(servers \\ %{}) do
servers = receive do
:servers ->
server_string = Map.values(servers)
|> Enum.map(fn elem -> elem.name end)
|> Enum.join(", ")
IO.puts("Available server list: #{server_string}")
servers
{:register, %{name: name}=server_info} ->
IO.puts("Registered new server: #{server_info.name}")
Map.put(servers, name, server_info)
_ ->
IO.puts("Unknown command")
servers
end
ComputeFarm.dispatcher(servers)
end

You’ll note that we have changed the value referenced by servers from a list to a map. The :register command now adds a new element to the map and the :servers command converts that map to a list of server names before displaying it.

With this mapping in place, we can now add a new command to allow the dispatcher to send a message to a particular server.

Let’s Make the Dispatcher Dispatch

Our next step is to add the new command type to the dispatcher that allows us to send a message to a chosen server. Here is the added code excerpt.

def dispatcher(servers \\ %{}) do
servers = receive do
{:dispatch, server_name} ->
case Map.get(servers, server_name) do
nil ->
IO.puts("Cannot find server #{server_name}")
%{name: name, pid: pid} ->
IO.puts("Sending to #{name}")
send(pid, "message")
end
servers
end
end

The code sample above only shows the single added case condition. You’ll note that the we send a tuple containing the :dispatch atom along with a server name. We lookup the server name from the servers map and if it is found, we send a message to the pid for that server. Elixir pattern matching makes this all very concise and clear.

Let’s take this code for a spin.

iex(1)> dispatcher = spawn(fn -> ComputeFarm.dispatcher end)iex(2)> spawn(fn -> ComputeFarm.server(dispatcher, "server_3") end)iex(3)> send(dispatcher, {:dispatch, "server_3"})
Sending to server_3
Server server_3 received message
Registered new server: server_3
iex(4)> send(dispatcher, {:dispatch, "not_here"})
Cannot find server not_here

You’ll note that when we give a valid server name, we get a message from the server indicating that it received the message. When we supply an invalid server name, we get the expected error message.

There is one problem. Note that when the server receives a message, it processes it and reinvokes the server. This has the unfortunate side-effect of re-registering the server with the dispatcher. Let’s dive into the server code and fix this issue.

We’ll pass an optional options map to the server function. When invoked with default options, it will register the server. When it is re-invoked recursively, we set the register key to false so we can skip this step.

def server(dispatcher_pid, name, options \\ %{register: true}) do
if options.register do
send(dispatcher_pid, {:register, %{name: name, pid: self}})
end
receive do
message -> IO.puts("Server #{name} received #{message}")
ComputeFarm.server(dispatcher_pid, name, %{register: false})
end
end

At this point, we have half the dispatcher to server handshake working. Now we will update both sides to allow the server to run a job and send a result back to the dispatcher.

Completing the Dispatcher/Server Handshake

In order to emulate a real time-consuming server operation, I decided to create a factorial function. Given large enough input values, these computations take noticeable time on my Powerbook. This will prove useful as we run multiple servers in parallel. The factorial function is a simple recursive function. Note that I’ve used the single line syntax with pattern matching of arguments here. This concise syntax makes me love writing Elixir code.

def factorial(0), do: 1
def factorial(n), do: n * factorial(n-1)

Once we had compute function, we can modify the server to generate a random input value and execute this function. I then send the result back to the dispatcher using a new command atom.

    receive do 
message -> IO.puts("Server #{name} received #{message}")
value = 100_000 * Enum.random(1..5)
IO.puts("Computing factorial of #{value}...")
result = factorial(value)
IO.puts("Computation complete")
send(dispatcher_pid, {:result, %{server: name}})
ComputeFarm.server(dispatcher_pid, name, %{register: false})
end

If you attempt to run this code, it will fail as the server does not yet know how to handle the new command. To fix this, we need to add another clause to the receive patterns.

    {:result, %{server: server, value: value, result: result}} ->
IO.puts("Received result from #{server}")

This is as simple as it gets. The dispatcher will indicate when each server returns a response. We can test drive it as follows

iex(1)> dispatcher = spawn(fn -> ComputeFarm.dispatcher end)iex(2)> spawn(fn -> ComputeFarm.server(dispatcher, "server_1") end)iex(3)> spawn(fn -> ComputeFarm.server(dispatcher, "server_2") end)iex(4)> send(dispatcher, {:dispatch, "server_1"})
Sending to server_1
Server server_1 received message
Computing factorial of 200000...
iex(5)> send(dispatcher, {:dispatch, "server_2"})
Sending to server_2
Server server_2 received message
Computing factorial of 100000...
Computation complete
Received result from server_2
Computation complete
Received result from server_1

When you run this code, you’ll see a delay before the server_2 result is received and another delay before the server_1 result is received. This makes sense given that in this case, server_2 had the shorter computation.

With this, we have completed the handshaking logic. Our next step is to add some smarts to the dispatcher so it can find and dispatch to servers as they become available.

Adding a Server Status to the Dispatcher

In order to add a scheduler to the dispatcher, we will need to keep track of the status of each node. I’ll do that by adding an additional key/value pair to the server_info map that is kept within the dispatcher. When the server is created, we will set the status to :avail. We can do this in the server function when we register a new server.

if options.register do
send(dispatcher_pid,
{:register, %{name: name, pid: self(), status: :avail}})
end

This code will still work as adding a field to a map does not require changes to other code that consumes the map.

Next we will update the :servers command to display the server name as well as the status. This is one pattern from the receive pattern matching logic.

:servers ->
server_string = Map.values(servers)
|> Enum.map(fn elem -> "#{elem.name}: #{elem.status}" end)
|> Enum.join(", ")
IO.puts("Available server list: #{server_string}")
servers

One we have this set, we can add code that sets the status to :busy each time a job is sent to a server. This is the code excerpt in the dispatcher.

{:dispatch, server_name} ->
server_info = Map.get(servers, server_name)
server_info = case server_info do
nil ->
IO.puts("Cannot find server #{server_name}")
server_info
%{name: name, pid: pid} ->
IO.puts("Sending to #{name}")
send(pid, "message")
%{server_info | :status => :busy}
_ ->
IO.puts("Unexpected condition. Aborting")
server_info
end
%{servers | server_name => server_info}

This code is a bit dense but fairly straightforward. We compute a new server_info map by modifying the version we look up. In the case where we get a valid map, we modify the :status entry to set it to :busy. Then at the bottom of the excerpt, we update the servers list by updating only the entry for this particular server to the newly computed server_info value.

We then need to write similar code to set the status to :avail when we receive a :result command indicating that a server is finished and is ready for a new job.

{:result, %{server: server}} ->
IO.puts("Received result from #{server}")
server_info = Map.get(servers, server)
server_info = %{server_info | :status => :avail}
%{servers | server => server_info}

This code is very similar to the code we just looked at. We look up the server_info map from the server that responded, modify it to set status to :avail and return a list of servers that updates that particular server entry.

With this, we will now have a consistently updated list of servers and will know which are busy and which are available for jobs.

If we test drive this new version, we will see this.

iex(1)> dispatcher = spawn(fn -> ComputeFarm.dispatcher end)iex(2)> spawn(fn -> ComputeFarm.server(dispatcher, "server_1") end)iex(3)> spawn(fn -> ComputeFarm.server(dispatcher, "server_2") end)iex(4)> send(dispatcher, :servers)
Available server list: server_1: avail, server_2: avail
iex(5)> send(dispatcher, {:dispatch, "server_1"})
Sending to server_1
Server server_1 received message
Computing factorial of 200000...
iex(6)> send(dispatcher, :servers)
Available server list: server_1: busy, server_2: avail
iex(7)> send(dispatcher, {:dispatch, "server_2"})
Sending to server_2
Server server_2 received message
Computing factorial of 100000...
iex(8)> send(dispatcher, :servers)
Available server list: server_1: busy, server_2: busy

Computation complete
Received result from server_2
iex(9)> send(dispatcher, :servers)
Available server list: server_1: busy, server_2: avail
Computation complete
Received result from server_1
iex(10)> send(dispatcher, :servers)
Available server list: server_1: busy, server_2: avail

As you can see, we invoke the :servers command and see the appropriate status at each step as jobs start and stop.

Summary

We now have a much more full-featured server farm. We have a server that computes factorials now. We have a dispatcher that can send jobs to a particular server and receive results back from any server. The dispatcher also keeps track and can report the status of all registered servers. In order to make this all happen we used the following Elixir concepts.

  1. Used the single line function syntax and pattern matching to crete a concise recursive function.
  2. Used the self() function to pass a PID from the server to dispatcher.
  3. Used map update syntax to crate new versions of maps with updated values.
  4. Used more extensive message sending to create a two-way handshake between dispatcher and servers.

With this in place, we have now laid the groundwork to build a scheduler that will allow the dispatcher to receive and queue large numbers of server requests and queue them in a round robin fashion to servers as they become available. We will implement this in next week’s blog post.

--

--

billperegoy
im-becoming-functional

Polyglot programmer exploring the possibilities of functional programming.