Adding Queuing to an Elixir Compute Farm

Tying the Pieces Together

billperegoy
im-becoming-functional
7 min readAug 1, 2017

--

What’s Left?

In the past two blog posts (processes and process communication), we have learned enough about Elixir and OTP to build a number of processes. One process can be invoked multiple times and acts as a server process that computes a factorial. The other process is a dispatcher that can keep track of what servers are available and can send jobs to a named server and report when they are done.

There is one big hole in the current configuration. The user interacting with the dispatcher still needs to know what specific server to send a job to. We would prefer to have the user simply queue a job to the dispatcher and know that the dispatcher will send the job to a server when one is available. This allows us to abstract the server farm details from the end-user.

As part of this post, we will add the following capabilities.

  1. Create a queue structure that can hold a list of queued jobs.
  2. Create a function that finds available servers.
  3. Add a dispatcher command that puts a new element on the queue.
  4. Add logic to queue new jobs if a server is free.
  5. Add logic to queue jobs when a new server is added if jobs are in queue.
  6. Add logic to queue a new job when a server frees up.

Let’s dive right into making these additions to the project.

Finding a List of Available Servers

Now that we’ve added a status field to each element of the servers map, we need a function that processes this map and produces a list of servers that are available. Using the functions provided in the Enum package, this code is fairly simple.

def available_servers(servers) do
Map.values(servers)
|> Enum.filter(fn elem -> elem.status == :avail end)
|> Enum.map(fn elem -> elem.name end)
end

Given that this is a pure function, this is a good time to start writing some tests. The Elixir test framework makes this a breeze. A mix project comes with a sample test in test/compute_farm_test.exs. We can easily modify this to write some tests for this function.

defmodule ComputeFarmTest do
use ExUnit.Case
doctest ComputeFarm
test "available_servers matching some" do
servers = %{"server_1" => %{name: "server_1", status: :avail},
"server_2" => %{name: "server_2", status: :busy},
"server_3" => %{name: "server_3", status: :avail}
}
assert ComputeFarm.available_servers(servers) ==
["server_1", "server_3"]
end
test "available_servers matching none" do
servers = %{"server_1" => %{name: "server_1", status: :busy},
"server_2" => %{name: "server_2", status: :busy},
"server_3" => %{name: "server_3", status: :busy}
}
assert ComputeFarm.available_servers(servers) == []
end
test "available_servers matching all" do
servers = %{"server_1" => %{name: "server_1", status: :avail},
"server_2" => %{name: "server_2", status: :avail},
"server_3" => %{name: "server_3", status: :avail}
}
assert ComputeFarm.available_servers(servers) ==
["server_1", "server_2", "server_3"]
end
end

No explanation is needed here. The great thing about pure functions is that the tests are as straightforward as can be. You can confirm that the function works by running mix test.

% mix test
...
Finished in 0.02 seconds
3 tests, 0 failures

Now we are ready to start building a queue.

Creating a Job Queue

The job queue will simply be a list of job information. We will push new jobs into one end of the list and pop them off the other end before we send them to be executed. For the time being we will continue the pattern of passing state into the dispatcher function and passing it back each time we recursively invoke the function. This isn’t the only (or necessarily best) way to store state in a process but it will work fine. We will first modify the function definition as follows.

def dispatcher(servers \\ %{}, queue \\ []) do

We include one more argument, queue and default it to an empty list. We then need to modify the recursive call of this function to recirculate the last value of queue.

ComputeFarm.dispatcher(servers, queue)

With an empty queue in place, it’s time to add a dispatcher command to add a job to the queue.

Adding Jobs to the Queue

Now that we have two pieces of state that can be modified via commands, we need to change the receive clauses to return a tuple of {servers, queue}. Here is an excerpt showing the change.

def dispatcher(servers \\ %{}, queue \\ []) do
{servers, queue} = receive do
:servers ->
server_string = Map.values(servers)
|> Enum.map(fn elem -> "#{elem.name}: #{elem.status}" end)
|> Enum.join(", ")
IO.puts("Available server list: #{server_string}")
{servers, queue}

We then add two new commands to the dispatcher: One to queue a new job and one to print the contents of the queue.

    {:queue, job} ->
{servers, [job | queue]}
:show_queue ->
IO.inspect(queue)
{servers, queue}

With this in place, we can manually test these new commands.

iex(1)> dispatcher = spawn(fn -> ComputeFarm.dispatcher end)iex(2)> send(dispatcher, :show_queue)
[]
iex(3)> send(dispatcher, {:queue, %{name: "job_1"}})iex(4)> send(dispatcher, :show_queue)
[%{name: "job_1"}]
iex(5)> send(dispatcher, {:queue, %{name: "job_2"}})iex(6)> send(dispatcher, :show_queue)
[%{name: "job_2"}, %{name: "job_1"}]

We can see that we can now add jobs to a queue. The contents of the job map are arbitrary. I’m just including a job name for now. In a real server, we’d likely include arguments to control the job. But our simple server requires no further information.

With a queue in place, it’s time to use the queue to feed jobs to the server.

Queuing New Jobs if Servers are Available

This is the first of several ways that new jobs can be sent to the servers. At any time a new job is queued, if there is an available server, we will send it directly to the server.

Given that we now will need to dispatch jobs from multiple places in our code, the first step is to factor out the dispatch function. The resulting function looks like this.

defp dispatch(servers, server_name) do
server_info = Map.get(servers, server_name)
case server_info do
nil ->
IO.puts("Cannot find server #{server_name}")
servers
%{name: name, pid: pid} ->
IO.puts("Sending to #{name}")
send(pid, "message")
server_info = %{server_info | :status => :busy}
%{servers | name => server_info}
_ ->
IO.puts("Unexpected condition. Aborting")
servers
end
end

With this new function, we can then simplify the :dispatch pattern.

{:dispatch, server_name} ->
{dispatch(servers, server_name), queue}

Next we will use this new function inside the :queue pattern match to dispatch a new job if there are available servers when the job is queued.

  {:queue, job} ->
{servers, queue} = case available_servers(servers) do
[] ->
{servers, [job | queue]}
list ->
server_name = List.first(list)
{dispatch(servers, server_name), queue}
end
{servers, queue}

In this code, we get a list of available servers. If the list is empty, we simply queue the job. If it is not, we pick the first available server and dispatch the job to that server. Since the dispatch function returns an updated server list, the selected server will be marked as :busy.

We can try this out with the commands below.

iex(1)> dispatcher = spawn(fn -> ComputeFarm.dispatcher end)iex(2)> spawn(fn -> ComputeFarm.server(dispatcher, "server_1") end)iex(3)> send(dispatcher, {:queue, %{name: "job_1"}})
Server server_1 received message
Computing factorial of 300000...
iex(4)> send(dispatcher, {:queue, %{name: "job_2"}})iex(5)> send(dispatcher, :show_queue)
[%{job: "job_2"}]

You’ll note that the first dispatch started a job on the server, but the 2nd did not (as that server was busy). You’ll also note that the second job is in the queue. Next we will write code that allows queued jobs to be dispatched as servers free up.

Dispatching Queued Jobs When Servers Free Up

With the groundwork we did in the last section, this part becomes easy. When we receive a response from a server, we check the queue. If there are items in the queue, we pop the last entry and submit that job. If there are no items in the queue, we do not perform a new dispatch.

First we need to do a little list manipulation to pop the queue and return a single job and the remaining list.

def pop_queue(queue) do
reversed_queue = Enum.reverse(queue)
job = List.first(reversed_queue)
queue = reversed_queue |> Enum.drop(1) |> Enum.reverse
%{job: job, queue: queue}
end

We then use that function within the :result pattern match to dispatch a new job if needed.

{:result, %{server: server}} ->
IO.puts("Received result from #{server}")
if length(queue) > 0 do
%{job: job, queue: queue} = pop_queue(queue)
IO.puts("Dispatching job: #{job.name}")
{dispatch(servers, server), queue}
else
server_info = Map.get(servers, server)
server_info = %{server_info | :status => :avail}
{%{servers | server => server_info}, queue}
end

Now, we have only one remaining condition. We would like newly registered servers to be able to immediately pick up jobs that are already queued. We will attack this issue next.

Making New Servers Accept Queued Jobs

This task looks almost exactly the same as the previous task. This time, we modify the :register pattern and if there are jobs in the queue, we dispatch the last job. Otherwise, the logic does not change at all.

{:register, %{name: name}=server_info} ->
IO.puts("Registered new server: #{server_info.name}")
if length(queue) > 0 do
%{job: job, queue: queue} = pop_queue(queue)
IO.puts("Dispatching job: #{job.name}")
{dispatch(Map.put(servers, name, server_info), name), queue}
else
{Map.put(servers, name, server_info), queue}
end

If we register a server and then queue multiple jobs, you’ll see the first job dispatched immediately. Once that job is running, if we register new servers, they will have jobs dispatched to them immediately.

This completes the basic functionality of the compute farm. The code is not the prettiest and we haven’t made any effort to make the code bullet-proof in any way. We will address this as we later dive deeper into OTP functionality.

Summary

This three part series gives a basic overview of how we can build a simple server farm using nothing but pure Elixir functionality. Note that we are using the most basic, low-level functionality at this point. In future posts, I will dive into some other deeper OTP functionality including.

  1. Adding supervisors so that processes that die can be automatically restarted.
  2. Alternative ways of handling state in Elixir and OTP.
  3. Running processes on multiple machines.

I’m learning OTP as I write this series so I’m wide open to ideas for future directions. If there is anything you’d like to read about in the future, please let me know.

--

--

billperegoy
im-becoming-functional

Polyglot programmer exploring the possibilities of functional programming.