Deploying stateful Phoenix game servers on Kubernetes

Dibyanshu Bhoi
Gamezop Tech
Published in
8 min readMay 1, 2024

Kubernetes is the de-facto deployment mechanism that handles deployments, scaling, and management of containerized applications. However, this is true for stateless services. Although we have stateful sets and volumes that store data in a persistent pod, discover new pods that use the stateful set, and manage state transfer to the node, we, being fond of Elixir, have built our game servers in Elixir. We will leverage the power of Erlang clusters, distributed systems, and awesome Elixir libraries to handle the heavy lifting.

The Problem

Anatomy of a game-server in Phoenix framework

The way our game servers are implemented:

- A Phoenix application supervisor starts a DynamicSupervisor for a game and Phoenix pub-sub system

- A game connects to the game server via a Phoenix client-server connection through a WebSocket

- The Phoenix channel is notified about the client connection.

- The Phoenix channel notifies DynamicSupervisor about the connection

- DynamicSupervisor then spins off a GenServer that manages states for that game.

Thus, in a long-running game server, we will have a Dynamic Supervisor, a bunch of GenServers maintaining game states of different games running, and a Phoenix channel for communicating with game clients.

So, when Kubernetes restarts the server, the Application Supervisor spawns the children processes and Phoenix handles the client reconnection, but we lose all the states in game GenServers.

after a redeployment

So want the new processes to be spawned and populated with original data. To get the desired result we need-

  1. A clustering mechanism that manages different Erlang clusters
  2. A distributed supervisor that manages process restarts across nodes
  3. A distributed agent that manages state transfer across nodes
  4. A distributed registry that registers processes across nodes

Erlang Clustering

Erlang has a built-in clustering mechanism. We can spawn new nodes in the same network with a unique name and connect by exchanging node names.

iex --name node1@127.0.0.1 - cookie asdf -S mix
iex --name node2@127.0.0.1 - cookie asdf -S mix
iex --name node3@127.0.0.1 - cookie asdf -S mix

In this example, each node has a unique name, and they all share the same cookie. Now we can connect these nodes by running the following code:

## in node3@127.0.0.1
iex(node3@127.0.0.1)2> Node.connect :"node1@127.0.0.1"
true
iex(node3@127.0.0.1)3> Node.list
[:"node1@127.0.0.1"]

## in node2@127.0.0.1
iex(node2@127.0.0.1)1> Node.connect :"node1@127.0.0.1"
true
iex(node2@127.0.0.1)2> Node.list
[:"node1@127.0.0.1", :"node3@127.0.0.1"]

We use libcluster for automatically forming clusters of Erlang nodes, with either static or dynamic node membership.

Distributed Supervisor

In OTP, the supervisor is a process that supervises other processes. Supervisors are used to build a hierarchical process structure called a supervision tree. Supervision trees provide fault tolerance and encapsulate how our applications start, restart, and shut down.

In our case, we have game server nodes running in different nodes. In a single node, the supervision tree can look after process restarts, but not if the process resides in a different node. For this, we need a supervisor that exists in all the nodes across the cluster, a Distributed Supervisor.

distributed supervisor

Here the distributed supervisor of cluster 2, restarted the game processes

We use Horde distributed supervisor mechanism for distributing supervisors across nodes in the cluster. Horde uses CRDT to sync data between nodes. This means the supervisors look at different data but they achieve eventual consistency.

Distributed Agent

The Distributed Agent is used to do state handoffs between a node that is going down and a new node that is coming up. K8s does a graceful shutdown on killing a node, where it will send a SIGTERM signal to the node. OTP can catch the termination message and perform an orderly shutdown of its applications. This provides an opportunity to do some cleanup. OTP takes the state of the node going down and stashes it in the Distributed Agent. In our case, the Distributed Agent is a CRDT. Since it is a CRDT, the data gets propagated to other nodes. The distributed supervisor in other nodes revives the processes, and the new processes read data from CRDT and populate the state.

CRDT of cluster-2 populating data into game genservers

Distributed Registry

We need the process registered in the cluster. When a process comes up in another node in the cluster, there is no way of knowing which process this refers to using a local registry. For this, we need a registry that is present across nodes and syncs process addresses in the cluster. We use Horde’s Registry module for a distributed registry. This uses delta-CRDT for keeping states and notifying processes when it loses a naming conflict.

Distributed registry registers processes across the cluster

Stitching it together

Now we have all the pieces we need to implement a stateful game server deploy it to Kubernetes and let OTP, libcluster, and Horde do the heavy lifting of managing node discovery, state management, and state transfer for us

Libcluster config

config :libcluster,
topologies: [
chef_empire: [
strategy: Cluster.Strategy.Kubernetes,
config: [
mode: :dns,
kubernetes_node_basename: "gameserver",
kubernetes_selector: "app=gameserverselector",
kubernetes_namespace: "gamenamespace",
polling_interval: 10_000
]
]
]

This clustering strategy works by fetching information about endpoints or pods, which are filtered by the given Kubernetes namespace and label.

Application Processes

Processes that need to be started by the application supervisor.

children = [
...
# cluster supervisor for libcluster
{Cluster.Supervisor, [topologies, [name: ChefEmpire.ClusterSupervisor]]},

# horde registry
{Horde.Registry, keys: :unique, name: NodeRegistry},

# horde dynamic supervisor
{Horde.DynamicSupervisor,
name: NodeSupervisor, strategy: :one_for_one, shutdown: 5_000},

# genserver for observing nodes in cluster
{NodeListener, []},

# genserver for state handoffs
{StateHandoff, []},
]

We want the Horde.Registry to start before Horde.DynamicSupervisor so that the dynamic supervisor can keep track of processes that are spread across the cluster using Horde.Registry.

Horde.Registry

We use module-based registries to enable dynamic runtime configuration of Horde.Registry.

defmodule NodeRegistry do
@moduledoc """
Horde registry for registering processes across cluster
"""
use Horde.Registry
require Logger

def start_link do
Horde.Registry.start_link(__MODULE__, keys: :unique, name: __MODULE__)
end

def init(opts) do
members()
|> Keyword.merge(opts)
|> Horde.Registry.init()
end

def via_tuple(name) do
{:via, Horde.Registry, {NodeRegistry, name}}
end

defp members do
[Node.self() | Node.list()]
|> Enum.map(&{__MODULE__, &1})
end
end

Here, on Horde.Registry init, we pass the current members of the cluster.

Horde.Registry implements a distributed Registry backed by a δ-CRDT (provided by DeltaCrdt). This CRDT is used for both tracking membership of the cluster and implementing the registry functionality itself. Local changes to the registry will automatically be synced to other nodes in the cluster.

Horde.DynamicSupervisor

Similar to Registry, we use a module-based dynamic supervisor for Horde.DynamicSupervisor too.

defmodule NodeSupervisor do
@moduledoc """
A horde dynamic supervisor
"""
use Horde.DynamicSupervisor
require Logger

def start_link(_) do
Horde.DynamicSupervisor.start_link(__MODULE__, strategy: :one_for_one, name: __MODULE__)
end

@impl true
def init(opts) do
members()
|> Keyword.merge(opts)
|> Horde.DynamicSupervisor.init()
end

def start_child(child_spec) do
Horde.DynamicSupervisor.start_child(__MODULE__, child_spec)
end

defp members do
[Node.self() | Node.list()]
|> tap(&Logger.info(inspect(&1)))
|> Enum.map(&{__MODULE__, &1})
end
end

State Handoff

The state handoff agent uses DeltaCRDT and OTP. This process is the data layer that sends data across nodes that syncs up eventually. Also, whenever a new node comes up, it will fetch data from this distributed agent (GenServer).

defmodule StateHandoff do
@moduledoc """
Module that transfers state amoung the cluster processes.
"""
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def join(node) do
GenServer.call(__MODULE__, {:add_node, {__MODULE__, node}})
end
def handoff(state_id, state_details) do
GenServer.call(__MODULE__, {:handleoff, state_id, state_details})
end
def get_state_details(state_id) do
GenServer.call(__MODULE__, {:get_state_details, state_id})
end

@impl true
def init(_opts) do
## State handoff initiated
{:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 5)
{:ok, crdt_pid}
end
@impl true
def handle_call({:add_node, node_module}, _from, crdt_pid) do
other_crdt_pid = GenServer.call(node_module, {:ack_add_node, crdt_pid})
DeltaCrdt.set_neighbours(crdt_pid, [other_crdt_pid])
{:reply, :ok, crdt_pid}
end
@impl true
def handle_call({:ack_add_node, other_crdt_pid}, _from, crdt_pid) do
DeltaCrdt.set_neighbours(crdt_pid, [other_crdt_pid])
{:reply, crdt_pid, crdt_pid}
end
@impl true
def handle_call({:handleoff, state_id, state_details}, _from, crdt_pid) do
DeltaCrdt.put(crdt_pid, state_id, state_details)
{:reply, :ok, crdt_pid}
end
@impl true
def handle_call({:get_state_details, state_id}, _from, crdt_pid) do
details = DeltaCrdt.to_map(crdt_pid) |> Map.get(state_id)
{:reply, details, crdt_pid}
end
end

NodeListener

We also need a separate process that will listen for {:nodeup, node} and {:nodedown, node} events and adjust the members of the Horde cluster accordingly.

defmodule NodeListener do
use GenServer
def start_link(_), do: GenServer.start_link(__MODULE__, [])
def init(_) do
:net_kernel.monitor_nodes(true, node_type: :visible)
{:ok, nil}
end
def handle_info({:nodeup, _node, _node_type}, state) do
set_members(MyHordeRegistry)
set_members(MyHordeSupervisor)
join_state_handoff()
{:noreply, state}
end
def handle_info({:nodedown, _node, _node_type}, state) do
set_members(MyHordeRegistry)
set_members(MyHordeSupervisor)
{:noreply, state}
end
defp set_members(name) do
members =
[Node.self() | Node.list()]
|> Enum.map(fn node -> {name, node} end)
:ok = Horde.Cluster.set_members(name, members)
end
defp join_state_handoff() do
Node.list()
|> Enum.map(&StateHandoff.join/1)
end
end

Now with all the mechanisms in place, we have a distributed dynamic supervisor that keeps track of processes across the cluster that can be found via a distributed registry. We can know when a node comes up and when a node goes down, and we have a distributed agent to store the data.

All we need is a process to start under the supervisor, register itself with the registry, and read and write data to the agent.

Starting a process

The game server we made is a distributed restaurant. We have a bunch of generator processes that generate products when they have a certain raw material. For example, a coffee machine GenServer that needs coffee beans.

Here we are starting a coffee machine GenServer under the NodeSupervisor.

defmodule ChefEmpire do
def install_coffee_machine(machine_id) do
child_spec = %{
id: ChefEmpire.Generator.CoffeeMachine,
start:
{ChefEmpire.Generator.CoffeeMachine, :start_link,
[[name: machine_id, details: %{beans: 0}]]}
}
NodeSupervisor.start_child(child_spec)
end
end

We will name the process using the NodeRegistry module we wrote earlier that uses HordeRegistry. On init, we use handle continue so that our process starts up in a non-blocking asynchronous manner and fetch the data from CRDT in the handle_continue block.

defmodule ChefEmpire.Generator do
use GenServer
require Logger

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: NodeRegistry.via_tuple(opts[:name]))
end

@impl true
def init(opts) do
## Process traps the exit, so that it gets to know when supervisor shuts it down.
Process.flag(:trap_exit, true)
state =
ChefEmpire.generator_module(opts[:type])
|> apply(:init_state, [opts[:type], opts[:name], opts[:serial]])
{:ok, state, {:continue, :handle_state}}
end

@impl true
def handle_continue(:handle_state, %{name: name} = state) do
state =
## Get state data from handoff agent
NodeManager.StateHandoff.get_state_details(name)
|> get_state_from_store(state)
## Set GenServer data
|> tap(fn state -> state |> generate() end)
{:noreply, state}
end

@impl true
def handle_call("stock", _from, state) do
{:reply, state, state}
end

@impl true
def handle_info("generate", state) do
state =
state
## In game logic to update GenServer state data
|> ChefEmpire.GeneratorProtocol.generate()
|> tap(fn state -> state |> generate() end)
## Store new data in handoff agent
NodeManager.StateHandoff.handoff(state.name, state)
{:noreply, state}
end

@impl true
def terminate(reason, state) do
## When node goes down, Supervisor does a graceful shutdown of processes and calls the terminate function.
## Child process in turn saves the data in the distributed state handoff agent.
NodeManager.StateHandoff.handoff(state.name, state)
:ok
end

defp generate(state) do
generation_interval = ChefEmpire.GeneratorProtocol.generation_interval(state)
Process.send_after(self(), "generate", generation_interval)
end

defp get_state_from_store(nil, old_state), do: old_state
defp get_state_from_store(state, _old_state), do: state
end

Final Flow

a new deployment coming up

And now, we are all set to deploy the application to Kubernetes and let OTP, CRDT, and Horde work their magic. The steps to address the shortcomings of stateful deployment might seem complex at first, but understanding the underlying concepts makes it easier and allows us to make modifications as per our requirements. For instance, instead of Delta CRDT, we can opt for Redis, Postgres, or distributed ETS. Alternatives to Horde’s Registry include Swarm and registry modules from lasp-lang.

--

--