My Journey into CQRS and Event Sourcing

Using Elixir — Part 4

Rodrigo Botti
Nexa Digital
4 min readNov 4, 2019

--

Three small flasks sealed with corks. From left to right: blue, red and yellow dye are being slowly mixed in each one.
bottles-potions-ink-dye-magic by JalynBryce

Note: This is part of “My Journey into CQRS and Event Sourcing” series. If you’re jumping in right now, please consider reading the previous ones.
<< Part 1 | < Part 3

Recap

Last time we created an Event Handler that was responsible for notifying a patient when someone asked it for consent:

  • Event Handler that listens to ConsentAsked events
  • Based on the event payload, a text message was built alerting the patient that a specific actor asked for permission to access a health record

Introduction

This time we'll explore some ways to optimize our application by dealing with two problems the application currently has.

It might not be that apparent but our application has both memory and performance problems coming from our aggregates:

  • When an aggregate is instantiated, it builds it's internal state by applying it's entire event stream in order.
  • Our aggregates are processes — GenServer — that once started — command routed successfully — never stop, unless we kill the application.

To mitigate those problems will employ the use of two techniques Aggregate State Snapshotting and Aggregate Lifespan, respectively.

Note: application code is available in this repository in the branch part-4.

Snapshotting

First of all let's recap how the initial aggregate state is built in an event sourced application:

  • The aggregate process is started
  • Aggregate listens to it's event stream
  • State is built by calling the aggregate's apply function for each event coming from the stream

It would look something like this:

# state construction pseudo-codestate = 
EventStore.stream(aggregate_identity)
|> Enum.reduce(aggregate_empty_state, &apply/2)

As you can imagine, once the event stream grows too large, this will increasingly take more time and memory to process.

In order to deal with that problem we apply the technique of Aggregate Snapshotting.

"A snapshot represents the aggregate state when all events to that point in time have been replayed […]. Instead of loading every event for an aggregate when rebuilding its state, only the snapshot and any events appended since its creation are read." — Commanded documentation

We can think of it as aggregate state checkpoints. By configuring snapshotting we avoid having to load and applying every event since the beginning:
Let's say we decide to snapshot state at every 50 events, the aggregate has emitted 55 events since it's creation, the process restarts. Instead of having to load and apply 55 events, the snapshot is loaded and only 5 events are applied to rebuild the current aggregate state. Something like:

# state construction pseudo-code with snapshotting{snapshot_state, snapshot_count} = 
EventStore.latest_snapshot(aggregate_identity)
state =
EventStore.stream(aggregate_identity)

# disregarding events prior to snapshot
|> Stream.drop(snapshot_every * snapshot_count)

# using snapshot as initial state
|> Enum.reduce(snapshot_state, &apply/2)

Luckily snapshotting is pretty easy to configure in Commanded, all we have to do is write a config for the aggregates we wish to snapshot and have a way to serialize our snapshot/state.

# -- lib/consent/aggregates/patient.ex --@derive Jason.Encoder
defstruct consent: %{}
# -- config/config.exs --

config :commanded, Consent.Aggregates.Patient,
snapshot_every: 30,
snapshot_version: 1

This configures out Patient aggregate to create a snapshot at every thirty events. To better understand the config parameters I recommend reading the official docs.

Aggregate Lifespan

Aggregates in Commanded are backed by a GenServer and once they are started they never stop. As you can imagine, if you have a lot of them around this can lead to unnecessary high memory usage.

What if we could choose when to stop our aggregate processes? That's where Lifespans come in. They describe a strategy to when we should stop or keep aggregates live — don't forget that once an aggregate needs to be restarted we'll have to calculate its internal state again.

By using an Aggregate Lifespan strategy we can tell commanded for how long it should keep it live after having handled specific events, commands and errors.

To define and use a lifespan strategy we first need a lifespan module that implements the Commanded.Aggregates.AggregateLifespan behaviour.

defmodule Consent.Aggregates.Lifespan.PatientLifespan do
alias Commanded.Aggregates.AggregateLifespan, as: Lifespan
alias Consent.Events.{ConsentAsked, ConsentGranted}
@behaviour Lifespan @impl Lifespan
def after_event(%ConsentAsked{}), do: :timer.hours(1)
def after_event(%ConsentGranted{}), do: :stop
def after_event(_event), do: :infinity
@impl Lifespan
def after_command(_command), do: :timer.minutes(5)
@impl Lifespan
def after_error(:validation_error), do: :timer.minutes(5)
def after_error(:consent_already_granted), do: :timer.minutes(5)
def after_error(:consent_not_granted), do: :timer.minutes(5)
def after_error(_error), do: :stop
end

As you can see, we can define for how long we should keep the aggregate around — after some criteria is met — e.g. after a ConsentAsked event is emitted the process remains active for one hour; after a ConsentGranted event is emitted the process is immediately stoped.

Note: The time values here are for explanation purposes only. You should determine for how long to keep you aggregates around based on usage patterns and according to you business rules — e.g. 80% of patients react to a ConsentAsked event within an hour, so it makes sense to keep the aggregate around for that amount of time, or something like it.

In order to use our lifespan strategy we need to register it in our command router.

# -- lib/consent/router.ex --dispatch(
[AskConsent, GrantConsent, RevokeConsent],
to: Consent.Aggregates.Patient,
identity: :patient_id,
lifespan: PatientLifespan # <----
)

Conclusion

We learned two techniques for dealing with some performance problems that may arise from the architectural patterns applied.

Commanded has first-class support for applying them. It was as easy as writing a few configurations and a short module.

Comments

  • When writing the code for this part, I also wrote a docker-compose.yml file to streamline the process of having both the event store and query-side databases locally.
  • Remember to choose the values for the number of events to create a snapshot and lifespan periods effectively: observing usage patterns, knowing your business and monitoring performance and resource usage. This was not done here, the values were chosen arbitrarily for explanation purposes.
  • Once again, thank you again for staying up until this point.

--

--