Elixir: Domain Driven Design with Actor Model

Couple of months back, I was listening to Eric Evan’s pod cast on Software engineering radio where he mentioned that actor model is a great fit for modeling aggregate root in domain driven design, I was like what, wait? ain’t OO languages the only way to model concepts like aggregate, sagas, domain service, repositories or bounded Contexts in Domain driven design ?

After hours of googling and days of reading and weeks of asking questions on forums, I realized that modeling DDD concepts is more explicit and easy in Elixir because of the language constructs it provides. For example using GenServer for aggregate root, OTP application for each Bounded Context, putting all of these Bounded Context together as an umbrella App.

What is an Aggregate Root ?

Essentially aggregate is pattern in domain driven design, Aggregate is a cluster of your domain objects which can be treated as a singe unit.

Let’s say, you are writing a blog engine and you have identified some domain models like

  1. Blog
  2. Post
  3. Comment
  4. Like

As you can guess these models have some kind of relationship with each other, while defining relationship between domain models, it is a good practice to define relationship in such a way that object graph is acylic, that is, there are no cycles. for example a blog might have posts but post should not contain reference to blog, similarly blog might contain comments but comment should not contain reference to post.

Now Blog, Post, Comment and Like is our cluster of objects and can be defined as an aggregate in our application, Next thing is to identify a model among these models and promote it to be root (called aggregate root) and make sure that all the operations/access on these models are happening through the aggregate root.

In our case we can promote Blog model to be aggregate root, and make sure that we only access Post, like Blog.Post or Comments like Blog.Post.Comment, this approach guarantees that our models are being modified only via our aggregate roots and we are not breaking any invariant.

DDD and Phoenix 1.3

If you have read about latest changes in Phoenix framework (1.3), you would realize that it forces developer to think about their models upfront and it encourages the concepts like Bounded Context and aggregate, wherein user need to specify context (more in line with Bounded context in DDD) to access resource (there are no models anymore). For example Blog, Post, Comment and Likes can be grouped together and accessed by using a Blog Context object. For detailed information take a look at this keynote by Chris McCord at ElixirConf 2017.

Phoenix framework is awesome, but it is not my application ( look here if not convinced), I like to think of Phoenix as only a web interface for my application and should only be responsible for exposing endpoints to interact with my application with minimal business logic. Following this approach means that I am going to move out my business logic from Phoenix app and put it somewhere else, to achieve this I will use Umbrella project and cut out our Bounded Context in our application into different OTP application within single Umbrella project.

Aggregate root as Elixir Process

Let’s consider an application where we are storing and retrieving user information such as name, age, gender and educational details, so we have following models.

NOTE: If you want to look the the entire repository, you can find it here.

  • BasicInfo module containing name, age and gender fields.
defmodule Bhaduli.User.BasicInfo do
defstruct [name: nil, age: nil, gender: nil]

def new(basic_info) do
%Bhaduli.User.BasicInfo{name: basic_info.name, age: basic_info.age, gender: basic_info.gender}
end
end
  • EducationalDetails module containing graduation, senior_secondary and intermediate fields
defmodule Bhaduli.User.EducationalDetails do
defstruct graduation: nil, senior_secondary: nil, intermediate: nil
def new(educational_details) do
struct(Bhaduli.User.EducationalDetails, educational_details)
end
end
  • User module which contains user_id, basicInfo and EducationalDetails model.
defmodule Bhaduli.User do
alias Bhaduli.User.{BasicInfo, EducationalDetails}
defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]
...

User Module as Aggregate root

I have got three models, basicInfo, educationalDetails and User. I am going to make User as my aggregate root and expose methods to save and retrieve EducationalDetails and BasicInfo as shown below.

defmodule Bhaduli.User do
alias Bhaduli.User.{BasicInfo, EducationalDetails}
defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]
def update(id, %BasicInfo{} = basic_info) do
end
def update(id, %EducationalDetails{} = educational_details) do
end
def get_basic_info(id) do
end
def get_educational_details(id) do
end
def get(id) do
end

So we have got basic skeleton for User aggregate, but still we haven’t really done anything interesting. Like I said earlier I want to model my aggregate root as a long running elixir process and way to do that is to use GenServer.

But before we do that I want to explain how entire flow should work. Each user will be an Elixir process, for example

  1. If a new user is created with user_id “Erin” than we would create a new process with user_id “Erin”.
  2. If we try to get user (user_id “Mustang”) for the first time , we will fetch it from database and spawn a new process with given user_id. Now we will have a total of two user processes one each for “Erin” and “Mustang”.
  3. If an existing user (user_id “Envy”) is updated and a process for that user has not been created yet than we are going to spawn new process and load its state from Database and update it. Now we will have three processes each for “Erin”, “Mustang” and “Envy”.
  4. We will keep one process for each user and keep it loaded in memory. If user process is already running we will read from it and return otherwise we go to database and spawn a new process.
  5. We will add a supervisor for user process so that every time a process crashes we create a new one and rehydrate it from database.

Process Registry

Even though I have outlined overall flow of application, there are few things that needs to be addressed first.

  1. Whenever a new process is created, we get back a PID. Since we are creating one process for each user, soon we will have thousands of processes . Anyone who want to get user data will have to know PID of each user process in order to interact with it.
  2. Having a supervisor for user process makes our application fault tolerant but also adds a complexity because every time a process crashes supervisor is going to create a new one and this new process will have a different PID than the one that had crashed.

Though we can hand-roll our process registration mechanism,but it will soon get hard to track all of these processes as we intend to create thousands of them. On top of it, since supervisor is going to create a new process for every crashed one, propagating new process Id to the client would add more complexity.

Luckily we have a module in Elixir called Registry, which allows us to map PID to any given string/atom, so that we don’t have to deal with cryptic PIDs. In our case we want to client to be able to interact with our application only by using user_id, so we would be mapping PID to user_id.

Registry can be started with Registry.start_link/3, once registry is started we need some kind of mechanism which will map our user_id to the PID of spawned process and way to do that is to implement a function that returns a :via tuple, based on a value we provide.

Our via_tuple function, along with the Registry module, will allows us to spawn a process for a given user_id and make subsequent calls on the process using the same id, rather than a PID.

Our :via_tuple function

def via_tuple(user_id) do
{:via, Registry, {:user_process_registry, user_id}}
end

GenServer provides the plumbing to make use of this :via tuple and automatically handle the registration of our name (user_id) to the actual process.

we will use this via_tuple function while spawning a new user process(User module)

#User.ex
def start_link(name) do
GenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))
end

Aggregate Root as GenServer

In order to make User aggregate an Elixir process, I am going to use GenServer behavior in User module.

In our main application file, start the registry. Here we are starting the registry by giving it a unique name as `user_process_registry` and supervising it.

defmodule Bhaduli do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(Registry, [:unique, :user_process_registry]),
supervisor(Bhaduli.UserSupervisor, [])
]
opts = [strategy: :one_for_one, name: Bhaduli.Supervisor]
Supervisor.start_link(children, opts)
end
end

User Supervisor, we are defining a Supervisor for user module by giving it a simple_one_for_one strategy and module name.

defmodule Bhaduli.UserSupervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__,nil, name: :user_sup)
end
def init(_) do
children = [worker(Bhaduli.User, [])]
supervise(children, strategy: :simple_one_for_one)
end
end

Now, we will use GenServer behaviour in User module and alias BasicInfo and Educational Details.

defmodule Bhaduli.User do

use GenServer
alias Bhaduli.User.{BasicInfo, EducationalDetails}
...

Next we will define a User struct to contain user_id, basic_info and educational_details.

defmodule Bhaduli.User do

use GenServer
alias Bhaduli.User.{BasicInfo, EducationalDetails}
@registry :user_process_registry
defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]

Let’s write a start_link/3 method in User module which will be called by User supervisor while starting User process. Here we are passing three arguments to start_link function

  1. __MODULE__, this is the module name.
  2. User struct as initial argument.
  3. Third parameter in alias for the spawned process, which allows us to access this process with given alias, we are using via_tuple function because GenServer provides the plumbing to make use of this :via tuple and automatically handle the registration of our alias (user_id) to the actual process.
#User.ex
def start_link(name) do
GenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))
end

Now comes the interesting part, we are defining a init callback which will be called when GenServer is started. It is this method which will be responsible for loading the User state from database.

#User.ex
def init(%Bhaduli.User{} = user) do
GenServer.cast(self(), {:populate_user})
{:ok, user}
end

Inside init we are calling handle_callback function, this callback goes to UserRepository and fetches the data from database, If a user is found it is returned otherwise and error is returned.

#User.ex
def handle_cast({:populate_user}, user) do
case Bhaduli.UserRepository.get(user.user_id) do
{:error, _} -> {:noreply, user}
{:ok, user} -> {:noreply, user}
end
end

It is important to note that GenServer won’t start until init method returns. for this reason we are using handle_cast method instead of handle_call because handle_cast can load data asynchronously and init method don’t have to wait for it to finish.

Once we have done all this plumbing we can get the PID of any process for a given user_id from Registry as shown below.

[{pid, _}] = Registry.lookup(@registry, id)

Getting basic_info for a given user_id

def get_basic_info(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:basic_info})
end

We are using handle_call for getting basic_info as we are going to wait for reply. Interesting thing to note here is that we are not making database call (Guess why ?)

def handle_call({:basic_info}, pid, state) do
{:reply, state.basic_info, state}
end

Similarly for educational_details

def get_educational_details(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:educational_details})
end
def handle_call({:educational_details}, pid, state) do
{:reply, state.educational_details, state}
end

For updating basic_info and educational details we are going to use handle_cast as we don’t need to wait for response from GenServer callback.

def update(id, %BasicInfo{} = basic_info) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_basic_info, basic_info})
end
def update(id, %EducationalDetails{} = educational_details) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_educational_details, educational_details})
end
#Callbacks
def handle_cast({:update_basic_info, basic_info}, user) do
user = %Bhaduli.User{user | basic_info: basic_info}
{:noreply, user}
end
def handle_cast({:update_educational_details, educational_details}, user) do
user = %Bhaduli.User{user | educational_details: educational_details}
{:noreply, user}
end

Putting it all together.

#User.ex
defmodule Bhaduli.User do
use GenServer
alias Bhaduli.User.{BasicInfo, EducationalDetails}
@registry :user_process_registry
defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]
def start_link(name) do
GenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))
end
def init(%Bhaduli.User{} = user) do
GenServer.cast(self(), {:populate_user})
{:ok, user}
end
def update(id, %BasicInfo{} = basic_info) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_basic_info, basic_info})
end
def update(id, %EducationalDetails{} = educational_details) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_educational_details, educational_details})
end
def get_basic_info(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:basic_info})
end
def get_educational_details(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:educational_details})
end
def get(id) do
case Registry.lookup(@registry, id) do
[{pid, _}] -> Registry.lookup(@registry, id)
GenServer.call(pid, {})
[] -> {:error, :not_found}
end

end
def handle_cast({:update_basic_info, basic_info}, user) do
user = %Bhaduli.User{user | basic_info: basic_info}
{:noreply, user}
end
def handle_cast({:update_educational_details, educational_details}, user) do
user = %Bhaduli.User{user | educational_details: educational_details}
{:noreply, user}
end
def handle_cast({:populate_user}, user) do
case Bhaduli.UserRepository.get(user.user_id) do
{:error, _} -> {:noreply, user}
{:ok, user} -> {:noreply, user}
enddef handle_cast({:update_basic_info, basic_info}, user) do
user = %Bhaduli.User{user | basic_info: basic_info}
{:noreply, user}
end
def handle_cast({:update_educational_details, educational_details}, user) do
user = %Bhaduli.User{user | educational_details: educational_details}
{:noreply, user}
end
end
def handle_call({:basic_info}, pid, state) do
{:reply, state.basic_info, state}
end
def handle_call({:educational_details}, pid, state) do
{:reply, state.educational_details, state}
end
def handle_call({}, pid, state) do
{:reply, state, state}
end
def via_tuple(user_id) do
{:via, Registry, {:user_process_registry, user_id}}
end
end

Code for this can be found here. This application is part of an Umbrella project and in next post I’ll show how this application is used from a Phoenix application.