My Journey into CQRS and Event Sourcing

Using Elixir — Part 1

Rodrigo Botti
Nexa Digital
12 min readSep 11, 2019

--

Three small flasks sealed with corks. From left to right: blue, red and yellow dye are being slowly mixed in each one.
bottles-potions-ink-dye-magic by JalynBryce

Note: this is part of “My Journey into CQRS and Event Sourcing” series. If you like what read here, please consider reading the next ones.
Part 2 >

Introduction

A while ago I decided to start learning more about CQRS and Event Sourcing. Those were both architectural patterns that have always fascinated me, that have always seemed like a good fit for an evolving application/system architecture specially when it comes to a microservices based one. Unfortunately I've never had the chance to work in an application/system which applied both these patterns.

Since I know that I learn something a lot better (specially when it comes to software development) when I actually practice it — code it, implement it, deploy it, etc — I decided I should give those patterns a spin by coding something that actually uses them.

This is the telling of the start of my journey into learning these patterns by building an application — or set of applications/services — that apply them.

Note: I'll be using Elixir code snippets to explain the concepts this article intends to present. The application built using these patterns is also written in Elixir. I will assume you have some Elixir knowledge — syntax, core functions, runtime.

The Theory

First let's start by defining these patterns in a very loose way. I do encourage you to read the more extensive definitions of them — some will be linked at the end.

CQRS

It's an acronym that stands for Command Query Responsibility Segregation.

"At its heart is the notion that you can use a different model to update information than the model you use to read information" — Martin Fowler

Basically we split our read — query — model from our write — command — model. We can represent that in a diagram — just replace UI component by a generic client:

CQRS Diagram by Martin Fowler

Event Sourcing

"Event Sourcing ensures that all changes to application state are stored as a sequence of events. Not just can we query these events, we can also use the event log to reconstruct past states, and as a foundation to automatically adjust the state to cope with retroactive changes." — Martin Fowler

In a typical CRUD application we mutate persistent state stored in a database — e.g. table records in a relational database. If we have an User entity and we want to change it's name, we store the same User but with its name changed.

With event sourcing rather than mutating persistent state, we actually store a sequence of events in an append-only event log called event store.

For the example above, we would instead append an event of UserNameChanged(user_id, new_name) for instance.

Note: There's more to event sourcing than just event logging and the event store. I suggest reading more on it from the references provided.

Putting it together

There are many ways to do Event Sourcing, but one pattern that is commonly used is CQRS.

CQRS Diagram from DDD Building Blocks Dictionary

On the command/write side we dispatch commands to the domain model and respond immediately. By handling the command — business logic — we emit events onto the event store, which are then fed back onto the domain model for potential event cascading but also to event handlers. The handlers then modify the read model for further querying by the client.

"At first this might seem unintuitive or verbose, but it allows for exciting features. Splitting read and write heavy operations in the system allows for separate tuning of the components that handle them — optimizing software for writing yields tremendous performance. But more profoundly, CQRS allows one to adapt to those data model changes so prevalent in fast-moving MVC codebases much more aptly. By constructing data after the event is stored, we’re free to change our interpretation of it in the future or add new features, and go back to past events and apply those learnings to build new data structures in the read model with retroactive effects." — Bruno Antunes

The Application

First we need a problem to solve, one that would benefit from applying these patterns.
Since I work for a health-tech company I decided to tackle a problem which we'll eventually need to address: Consent Management.
Why? With event sourcing we store all events instead of mutating a persistent state so we get auditing "for free". We can also handle consent events to trigger user-facing side effects such as push notifying users when something happens or use event streaming to integrate with other services asynchronously. Also, something not related to the domain, we can evolve our read model according to evolving business requirements — if necessary we can replay all events and reconstruct our read models for new use cases.

Note: The application is implemented using Commanded. Commanded is an awesome Elixir library that helps abstract away some of the plumbing necessary when building an application of this kind. We'll be blazing through some of it's concepts and building blocks but be sure check out the full documentation.

Requirements

We need a service that allows for health care professionalsdoctors and nurses for now — and patients to ask a patient for consent i.e. permission to view his/hers health records — exams, consultations, conditions and ER visits for now. The patient should be able to grant consent — give permission — revoke a granted consent — remove permission — and view/list all consents he/she has granted.
There should also be a way to easily know whether or not a health care professional has consent over a patient's health records.

From this we can define some aspects of our service:

  • Actors: patients and health care professionals
  • Write Side (Command) API: ask for consent, grant consent, revoke consent
  • Read Side (Query) API: list patient's granted consents, check whether or not a professional has consent
  • Business Invariants: * A professional cannot ask for a consent which was already granted; * A patient cannot grant a consent which was already granted; * A patient cannot revoke a consent which has already been revoked or was not granted; * Consents can be asked by other patients; consents can be granted to other patients; * Professionals can only be doctors and nurses; * Permissions can only be given/asked/revoked for health records of type exams, consultations, conditions and ER visits

With the higher level api of our application defined and the requirements we can derive:

  • Commands: AskConsent, GrantConsent, RevokeConsent — Referencing a patient id, the professional's id, the professional's type (e.g. doctor or nurse), the health record type being targeted.
  • Events: ConsentAsked, ConsentGranted, ConsentRevoked — Also referencing a patient id, the professional’s id, the professional’s type, the health record type.
  • Aggregates: Patient — Identified by the patient id; its state should have a data structure capable of rapidly checking for the existence of granted consents.
  • Projections: PatientConsent Entity — references the patient's id, the professional’s id, the professional’s type and has a list of health record types to which permission has been granted.
Event Sourcing With Elixir — Main entities in Commanded Library by Bruno Antunes

Now let's dive into some implementation.

Commands

We will define our commands as Elixir structs. We'll also have one module per command:

defmodule Consent.Commands.GrantConsent do
@enforce_keys [:patient_id, :to_id, :to_entity, :target]

defstruct [
:patient_id, # patient's id
:to_id, # id of professional/patient
:to_entity, # patient | doctor | nurse
:target # exams | consultations | conditions | ...
]
end

Events

We'll also define our events as structs and have one module per event — remember to have the event's name in past tense:

defmodule Consent.Events.ConsentGranted do
@enforce_keys [
:patient_id, :to_id, :to_entity, :target
]
@derive Jason.Encoder
defstruct [
:patient_id,
:to_id,
:to_entity,
:target,
:timestamp
]
end

Notice how we derive the Jason.Encoder protocol. That way commanded is able to serialize our events as JSON encoded binaries prior to saving them to the Event Store.

Aggregate

Our Patient aggregate is an Elixir Module that:

  • Defines the aggregate state — a struct
  • Has two kinds of functions
# command functions: take the current state and a command
# return events
@spec execute(state, command) :: {:ok, [event]} | {:error, term()}# state mutators: take the current state and an event
# return new state
@spec apply(state, event) :: state

For our case:

  • the state will be a map: key is a tuple of {actor_type, actor_id}, value is the list of permissions granted. That way it is easy to enforce our business invariants
  • command functions will pattern match on the struct type of the aggregate and of the command, enforce the invariants and return an event
  • state mutators will pattern match on the struct type of the aggregate and return the updated state reflecting the event application
defmodule Consent.Patient do  alias Consent.Events.{
ConsentAsked, ConsentGranted, ConsentRevoked
}
alias Consent.Commands.{
AskConsent, GrantConsent, RevokeConsent
}
alias __MODULE__
@derive Jason.Encoder # used for snapshotting (not discussed)
defstruct consent: %{} # empty map as initial state
# executing GrantConsent command
def execute(%Patient{} = patient, %GrantConsent{} = cmd) do
if has_consent?
(
patient,
{cmd.to_entity, cmd.to_id},
cmd.target
)
do # consent already granted -> error
{:error, :consent_already_granted}
else
%ConsentGranted{ # dispatch event
patient_id: cmd.patient_id,
to_entity: cmd.to_entity,
to_id: cmd.to_id,
target: cmd.target
}
|> add_timestamp()
end
end
# applies ConsentGranted to aggregate
def apply(
%Patient{consent: consent} = patient,
%ConsentGranted{
to_entity: entity,
to_id: id,
target: target
}) do
consent
|> Map.get({entity, id}, [])
|> (&[target | &1]).()
|> update_consent({entity, id}, consent)
|> update_patient(patient)
end
# private helpers defp has_consent?(%{consent: consent}, {entity, id}, target) do
consent
|> Map.get({entity, id}, [])
|> Enum.member?(target)
end
defp update_consent(target_list, key, %{} = consent_map) do
Map.put(consent_map, key, target_list)
end
defp update_patient(%{} = consent_map, %Patient{} = patient) do
%Patient{patient | consent: consent_map}
end
# other commands and state mutators omitted for brevityend

Note: both the state mutator and command functions take the current state as parameters. This stateful behaviour is possible because Commanded spawns a GenServer per aggregate instance.

Command Router

The command router will be an Elixir module that adopts the behaviour of a Commanded.Commands.Router. Here's where we define how to forward the commands to the appropriate aggregates by defining which commands are dispatched to which aggregate — and possibly a command handler — and aggregate identity:

defmodule Consent.CommandRouter do
use Commanded.Commands.Router
alias Consent.Commands.{AskConsent, GrantConsent, RevokeConsent} # middlewares won't be discussed
middleware Commanded.Middleware.Logger
middleware Consent.Middlewares.Validation
dispatch(
[AskConsent, GrantConsent, RevokeConsent], # commands
to: Consent.Patient, # aggregate module
identity: :patient_id # how to locate or spawn aggregate process
)
end

Now when we dispatch a command through the router it will forward it to the appropriate patient aggregate identified by the patient's id.

Note: It is possible to dispatch the command to a Command Handler — which receives the aggregate and the command in its handle/2 function — to validate, authorize or enrich command data prior to calling the aggregate. In our case we are forwarding the command directly to the aggregate.

Event Store

The specialized data store that stores the events. Commanded ships with adapters for two of them: Commanded author’s own EventStore, open source and based on PostgreSQL — the one we're gonna use — , and Greg Young’s Event Store, also open source and with optional paid support.

To configure the eventstore:

# /config/config.exsconfig :commanded,
event_store_adapter: Commanded.EventStore.Adapters.EventStore
---# /config/dev.exsconfig :eventstore,
column_data_type: "jsonb"
config :eventstore, EventStore.Storage,
serializer: EventStore.JsonbSerializer,
types: EventStore.PostgresTypes,
username: "write_model_db_user",
password: "write_model_db_password",
database: "write_model_db_database",
hostname: "localhost",
port: 5433,
pool_size: 10

This will configure commanded to use PostgreSQL as the Event Store and also configures the connection to the database.

Api — Write/Command

Now that we have the command side implemented we can start implementing our high level api and it is as simple as having functions that build commands from their parameters and dispatch it using our router:

defmodule Consent do
alias
Consent.CommandRouter, as: Router
alias Consent.Commands.{AskConsent, GrantConsent, RevokeConsent}
# -- Write --

def ask_consent(patient_id, by_entity, by_id, target) do
%AskConsent{
patient_id: patient_id,
by_entity: by_entity,
by_id: by_id,
target: target
}
|> Router.dispatch()
end
def grant_consent(patient_id, to_entity, to_id, target) do
%GrantConsent{
patient_id: patient_id,
to_entity: to_entity,
to_id: to_id,
target: target
}
|> Router.dispatch()
end
def revoke_consent(patient_id, from_entity, from_id, target) do
%RevokeConsent{
patient_id: patient_id,
from_entity: from_entity,
from_id: from_id,
target: target
}
|> Router.dispatch()
end
# -- TODO: Read --end

Projector

A projector is a specialized Event Handler, it's a module that listens for events and builds/changes the read model. We're gonna use another commanded library — commanded-ecto-projections — that simplifies projector implementation with Ecto. With this library we're going to build our read model using a SQL database supported by Ecto — Postgres.

Our projector will listen for events and create/update our PatientConsent entity — patient_consent table.

First let's define our Ecto Schema for our read model:

defmodule Consent.Schemas.Patient do
use Ecto.Schema
schema "patient_consent" do
field(:patient_id, :string)
field(:entity_name, :string)
field(:entity_id, :string)
field(:permissions, {:array, :string})
end
end

Now we have to configure the Ecto Repo for our read model:

# /lib/consent/repo.exdefmodule Consent.Repo do
use Ecto.Repo,
otp_app: :consent,
adapter: Ecto.Adapters.Postgres # postgres connector
end
---# /config/config.exsconfig :consent,
ecto_repos: [Consent.Repo]
---# /config/dev.exsconfig :consent, Consent.Repo, # database connection config
username: "read_model_db_user",
password: "read_model_db_password",
database: "read_model_db_database",
hostname: "localhost",
port: 5433,
pool_size: 10

Note: Ecto is an awesome library but explaining it's details is out of the scope of this article. Please refer to the official documentation.

Now we can configure and implement our projector.
The projector uses the project/3 macro from Commanded.Projections.Ecto. We just have to pattern match to our domain events and create/update our read model using the Ecto.Multi passed to the callback function:

# /config/config.exconfig :commanded_ecto_projections,
repo: Consent.Repo # projectors will use our Ecto Repo
---# /lib/consent/projectors/patient.exdefmodule Consent.Projectors.Patient do
use Commanded.Projections.Ecto, name: to_string(__MODULE__)
import Ecto.Query alias Consent.Events.{
ConsentAsked, ConsentGranted, ConsentRevoked
}
alias Consent.Schemas
alias Consent.Repo
# -- projections -- # projecting ConsentGranted event
project(%ConsentGranted{} = evt, fn multi ->
patient =
query_patient(evt.patient_id, evt.to_entity, evt.to_id)
|> Repo.one()
case patient do
nil ->
Ecto.Multi.insert(
multi,
:create_patient_consent,
%Schemas.Patient{
patient_id: evt.patient_id,
entity_name: evt.to_entity,
entity_id: evt.to_id,
permissions: [evt.target]
})
saved ->
changeset =
[evt.target | saved.permissions]
|> update_permissions_changeset(saved)
Ecto.Multi.update(multi, :update_patient_consent, changeset)
end
end)
# -- helpers -- defp update_permissions_changeset(permissions, patient) do
patient
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_change(
:permissions,
Enum.uniq(permissions)
)
end
defp query_patient(patient_id, entity_name, entity_id) do
from(
p in Schemas.Patient,
where:
p.patient_id == ^patient_id and
p.entity_name == ^entity_name and
p.entity_id == ^entity_id
)
end
end

Api — Read/Query

Now that we have our read model ready we can implement our queries on our high level api:

defmodule Consent do
# omitted for brevity

# -- Read --
def consents(patient_id) do
from(
p in Patient,
where: p.patient_id == ^patient_id
)
|> Repo.all()
end
def has_consent?(
patient_id,
entity_name,
entity_id,
permission \\ :all
) do
permissions =
from(
p in Patient,
where:
p.patient_id == ^patient_id and
p.entity_name == ^to_string(entity_name) and
p.entity_id == ^entity_id,
select: p.permissions
)
|> Repo.one()
case permissions do
nil -> false
[] -> false
list -> to_string(permission) in list or "all" in list
end
end

Running the Application

Let's run our application in the terminal with the IEx interactive shell— you'll need to have Elixir and git installed.

# download code
git clone git@github.com:rodrigobotti/cqrs_es_consent.git
git checkout part-1
cd cqrs_es_consent
# download dependencies
mix deps.get
mix compile
# run interactive shell with project modules available
iex -S mix

Now we can interact with our application in the interactive shell. We'll do that by using the high level api — Consent module:

# list consents for patient "patient_id"
iex> Consent.consents("patient_id")
[] # empty list - has no consents granted
# granting consent
iex> Consent.grant_consent("patient_id", "doctor", "doctor_id", "exams")
:ok
# just returns :ok atom
# granting another consent
iex> Consent.grant_consent("patient_id", "doctor", "doctor_id", "consultations")
:ok
# attempt to grant already granted consent
iex> Consent.grant_consent("patient_id", "doctor", "doctor_id", "consultations")
{:error, :consent_already_granted}
# returns error
# list consents
iex> Consent.consents("patient_id")
[
%Consent.Schemas.Patient{
__meta__: #Ecto.Schema.Metadata<:loaded, "patient_consent">,
entity_id: "doctor_id",
entity_name: "doctor",
id: 6,
patient_id: "patient_id",
permissions: ["consultations", "exams"]
}
]
# checking consent
iex> Consent.has_consent?("patient_id", "doctor", "doctor_id", "exams")
true
# because this consent was granted
# revoking consent
iex> Consent.revoke_consent("patient_id", "doctor", "doctor_id", "exams")
:ok
# checking consent again
iex> Consent.has_consent?("patient_id", "doctor", "doctor_id", "exams")
false
# because we revoked it

As we can see, dispatching the commands enforces the business invariants and returns immediately. When we query the read model we can see the handlers — projectors — managed to modify the read model allowing us to query it.

Conclusion

By building this application I managed to accomplish learning about some of the pattern's concepts in practice and clear some misconceptions I had about some elements of the architecture.

The most important thing for me was that I was able to understand in practice the responsibilities of each component of the architecture and how they all connect together.

Summarizing what we learned so far:

  • How to combine CQRS and Event Sourcing
  • Components of the architecture: Command Handlers, Aggregates, Event Handlers, Event Store
  • How the components connect together to form an application
  • Using Elixir with Commanded for implementing the architectural patterns

References and Comments

--

--