Elixir. Process Registry. One-node pool.

Process registry and their friends

While Poolboy is on a vacation, we will talk about worker pool creation. This article will describe a simple one-node process pool.


BRIEF

What is a process pool and when should we use it? Let’s assume you have a long running task in your system. That can be image uploading or database query. When you run such task in a process and waiting for the response, process is blocked — this is the easiest way to get a synchronous overwhelmed queue. So we decide to spawn tasks concurrently. For example, every task should connect to a database, and database maximum connection pool is 3 connections. Than we need to spawn 3 processes, save their pids, restart them when they crash and give some meaningful API to use all this stuff. Let’s see how to do this.


APPLICATION

Let’s move from top to bottom. This is our application module. While application starts it spawns a top-level supervisor with two workers on-board and rest-for-one strategy. Rest-for-one — when worker crashed, supervisor crashing all workers started after the crashed one. Supervisor always starts workers synchronously so if worker Registry will die, than Pool Server should be killed by a supervisor too. Also supervisor has a limit on amount of child restarts, if worker decides to crash infinitely, supervisor crashing by itself and killing all it’s dependencies after that limit amount is over. Be aware of long waiting initiation of workers. Default timeout is 5 seconds, after then process will be killed. If you want to run long waiting initiations —you can find more details here Long-waiting calls.

So two children here: Registry and a Pool Server.

Application Module

As you can guess from it’s name Registry will contain process pids and their related keys. And Pool Server is an interface for workers and workers supervisor.


POOL SERVER

Pool Server

This is our Pool Server. Pretty straightforward. Actually this module doesn’t implement gen_server behavior, it’s just an interface.

We define a pool size to 3 workers. Here start_link instantiates pool supervisor(you will see this module in a short). Call and cast are just for example and surely should be replaced to something more meaningful in a real project. The most interesting part here is a small function choose_worker with a worker_key paramater. It takes a key, which is a worker name or something else but meaningful, hash it and responds with a value in a range of 1..3. So we will always have the same worker for the same worker key value. That’s how you can forward different users to different workers, etc.

From Erlang docs: erlang:phash2(Term, Range) -> Hash

Portable hash function that gives the same hash for the same Erlang term regardless of machine architecture and ERTS version (the BIF was introduced in ERTS 5.2). The function returns a hash value for Term within the range 0..Range-1. The maximum value for Range is 2³². When without argument Range, a value in the range 0..2²⁷-1 is returned.


POOL SUPERVISOR

Pool Supervisor

This is our Pool Supervisor. We start it in a local namespace with a name equal to module name and pool_size as a parameter and set it to 3. Start_link calls init callback. In init we instantiate workers. Notice :id argument in a worker function call. This argument is related to supervisor but not to a worker itself. Supervisor should know it’s children. Here id is a tuple {:atom_name, worker_id} where worker_id is an integer, but you can use any.

Strategy is also one_for_one. We want to restart a worker after it crashed.


POOL WORKER

Pool worker implements gen_server behavior. We define worker’s name outside of the module using process_id function; in a supervisor and in a registry(you will see it soon) just for the sake of simplicity. Take a look at start_link, call and cast functions. They are all has the same function call via_tuple(name). Gen_server is smart and knows that tuple {:via, process_module_name, process_name} is exactly what should define a connection to a process registry. So we have this out of the box.


PROCESS REGISTRY

Process registry is a Plain Old GenServer(POGS). But there is one unusual thing in a module definition. We should import all Kernel functions except send. We need to override this function to allow process registry to send values to a correct process or return an error if there was no such a process spawned.

Registry

From Elixir docs: send(dest, msg)

send(dest :: pid | port | atom | {atom, node}, msg) :: msg when msg: any

Sends a message to the given dest and returns the message.dest may be a remote or local PID, a (local) port, a locally registered name, or a tuple {registered_name, node} for a registered name at another node.

Next is a module interface. It’s quite straightforward. Start_link starts Registry module in a local namespace. Register_name(key, pid), well, registering worker name for a process pid. Whereis_name — lookup for a spawned registered process. Send — sends a message to a process if there is one or returns an error. Unregister_name removes process name-pid from a registry. Our registry is a simple Map. We will change map in future to a more complex and fast storage. But it’s enough for now.

API

Gen_server callbacks.

Register_name has one special row. We define a monitor to be sure that we will not left orphan pids in a registry if they will crash.

register_name

Below is a callback for a monitor. Monitor sends DOWN message and we should take it out of a mailbox. Parameter is a pid to remove. Also we should define handle_info “catch all” function to be sure we will not overwhelmed mailbox.

Monitor callback

Other two callbacks — lookup and unregister.

Nothing special here

DONE

Next time we will look at dynamic process spawning and other way to handle process on a single node. You can find full source here:

Link to Github repo pharosproduction/process-registry

And of course you’re welcome to hire us at Pharos Production Inc.

Thanks for reading!