1 Year of Event Sourcing and CQRS

I have been working on implementing an application based on CQRS and Event Sourcing principles for about one year.

This post is a way for me to describe my journey. I don’t pretend to be Greg Young but I believe sharing the challenges and the problems I faced may be useful for some people. Especially if you are starting your own journey.


Business context

The context of the project was related to the Air Traffic Management (ATM) domain. We designed a solution for an ANSP, an Air Navigation Service Provider in charge to control a specific geographical area. The goal of the application was simple: compute and persist flight data. The process was roughly the following.

Few hours before having a flight crossing its airspace, an ANSP receives information from Eurocontrol the organization managing air traffic across Europe. This information contains the planned data like the aircraft type, the departure, the destination, the requested route etc. Once the aircraft reached the AOR of an ANSP (Area Of Responsibility, the area where an ANSP is responsible for controlling and monitoring the flights) we can receive inputs from various sources: track updates (what’s the current position of a flight), requests to modify the current route, events triggered by a trajectory prediction system, alerts from a conflict detection system etc.

Even though we had to handle potentially several concurrent requests at the same time, throughput-wise it was not comparable with Paypal or Netflix.

Nonetheless, the application was part of a safety-critical environment. In case of a critical failure, we will not lose money or customers. We may lose human lives. So implementing a reliable, responsible and resilient system to guarantee data consistency/integrity was obviously a top priority.

CQRS, Event Sourcing

Both patterns are actually pretty easy to understand.

CQRS

CQRS (Command Query Responsibility Segregation) is a way to dissociate writes (Command) and reads (Query). It means we can have one database to manage the write part. Whilst the read part (also called view or projection) is derived from the write part and can be managed by one or multiple databases (depending on our use cases). Most of the times, the read part is asynchronously computed which means both parts are not strictly consistent. We are going to come back on this point later on.

One of the ideas behind CQRS is that a database is hardly as efficient to manage reads and writes. It can depend on the choices made by the software vendor, the database tuning applied etc. As an example, Apache Cassandra is known to be efficient while persisting data whereas Elasticsearch is great for search. Using CQRS is really a way to take advantage of the strengths of a solution.

Furthermore, we may also decide to handle distinct data models. Once again, depending on the requirements. For example, managing one model used in the context of a reporting view, another denormalized model efficient during the persistence on the write part etc.

Concerning the views, we may decide to implement some consumer-agnostic ones (for example to expose a specific business object) or some that would be specific to consumers.

Event Sourcing

According to Martin Fowler Event Sourcing:

Ensures that all changes to application state are stored as a sequence of events

It means we do not store the state of an object. Instead, we store all the events impacting its state. Then, to retrieve an object state we have to read the different events related to this object and applied them one by one.

CQRS + Event Sourcing

Both patterns are frequently grouped together. Applying Event Sourcing on top of CQRS means persisting each event on the write part of our application. Then the read part is derived from the sequence of events.

In my opinion, Event Sourcing is not required when we implement CQRS. Yet, the opposite it not necessarily true.

Indeed, for most of the use cases, CQRS is required when we implement Event Sourcing because we may want to retrieve a state in O(1) without having to compute n different events. One exception is the use case of a simple audit log. Here, we don’t need to manage views (nor states) as we are only interested in retrieving a sequence of logs.

Domain Driven Design

Domain Driven Design (DDD) is an approach to tackle software complexity related to domain models. It was introduced in 2004 by Eric Evans in the Domain-Driven Design: Tackling Complexity in the Heart of Software book.

We will not introduce all the different concepts but if you are not familiar with it, I would strongly recommend you to take a look at it. Nevertheless, we are just going to introduce the concepts which are useful in the context of a CQRS/Event Sourcing application.

The first concept brought by DDD is the aggregate. An aggregate is a cluster of domain objects which are considered as one unit with regard to data changes. A transaction within an aggregate must remain atomic.

Meanwhile, an aggregate enforces its own data consistency/integrity using invariants. An invariant is simply a rule which must remain true regardless of the changes. For example, a STAR (Standard Terminal Arrival Route, basically a predefined route before the landing) is always linked to one given airport. An invariant must enforce that a destination airport cannot be modified without having changed the STAR and that this STAR is valid with this airport.

In addition, the object acting as a facade for an aggregate (handling the inputs and delegating the business logic to the sub-objects) is called the aggregate root.

Concerning the objects composing an aggregate, we need to distinguish entities from value objects. An entity is an object with an identity, it is not defined by its attributes. A person will have a different age over the time but he/she will remain the same person. On the other hand, a value object is solely defined by its attributes. An address with a different city is a different address. The former is mutable whereas the latter is immutable. Furthermore, an entity can have its own life cycle. For example, a flight is first ready for departure, airborne (flying) and then has landed.

In the model definition, an entity should be as simple as possible and focused on its identity and its life cycle. In the context of a CQRS/Event sourcing application, entities are a key element because most of the times the changes made within an aggregate are done depending on their life cycles. It is crucial for example to make sure each entity implements a function to determine whether it is equals or not to another entity instance. It can be done by comparing an identifier, or a set of related attributes guaranteeing an identity.

Now that we are aware of the concept of entity, let’s come back on the invariants. To define them, we used a language inspired by the BDD (Behavior-Driven Development) format:

Given [entity] at state [state]
When [event] occurs
We shall [rules]

I really felt like this was something very efficient. Mainly because this was easily understandable by business people.

Last but not least, DDD also brings the concept of bounded context. Basically, instead of managing one large complex model, we can split it in different contexts with explicit boundaries. I already mentioned this concept in my post Why is a Canonical Data Model an Anti Pattern.

We can apply the concept of bounded context when we have to design a view. As mentioned, a view can be either specific to a consumer (because we need to achieve low latency or because of something else) or common to several consumers.

In the latter case, we have to think about the data model exposed. Is it a global and shared model for the whole company or is it something made in a particular context like one given functional domain?

If it’s a shared model, we need to bear in mind the impacts on the consumers in case of a change. This could be mitigated by applying a service layer on top of the views but I’m personally in favor of directly contextualizing the views. In case of model change, for example, we could keep the original view exposing the previous model and create another view to expose the new model.

Command vs Events

In an Event Sourcing architecture, it is important to distinguish commands from events. A command represents an intention (modeled with the present tense like CreateCustomer) while an event represents a fact, something that already happened (modeled with the past tense like CustomerUpdated).

As a concrete example in my project, an event may be the reception of a radar track indicating a current airplane position. The system can hardly refuse such events as it is a known fact which has already happened (when did it occur exactly may depend on various factors like the latency etc.).

On the other side, a flight controller who would like to modify a flight trajectory is a command. It is a user intention and before to be considered as a fact, it must be validated by our application.

Most of the times, a command is designed as a synchronous interaction and an event as an asynchronous one. Not in all cases though.

It is important also to keep in mind the concept of data ownership. Let’s imagine a simple interaction between two systems A and B exchanging customer data. If A produces an asynchronous CustomerUpdated message caught by B and that B is considered as the owner of the customer object (at the current stage of the customer lifecycle) it may be entitled to refuse the change. Even though the message looks like a domain event, at the end it’s just a plain old command for the B system.

Implementation

The design was based on the Axon Framework. I will not mention this framework anymore as this post is technology agnostic. Yet, if you are implementing an application in a Java environment, I would strongly recommend you to take a look at it. In my opinion, Axon Framework is really great for implementing a CQRS/Event Sourcing application.

Let’s see the internal application design:

Application design

In a nutshell the application receives command and publishes internal events. Those events are persisted in an event store and published to handlers which are themselves responsible to update the views. We may also decide to implement a service layer (called read handler) on top of the views.

Now, let’s see in detail the different scenarios.

Aggregate creation

The command handler receives a CreateFlight command and checks in the domain repository whether an instance exists or not. This domain repository manages aggregate instances. It checks first in a cache and if the object is not present, it will check in the event store. The event store is a database to persist a sequence of events. We will see later on what makes a good event store in my opinion. In this case, the event store is still empty so the repository doesn’t return anything.

The command handler is responsible to trigger the invariants. In case of a failure, we can synchronously throw back an exception indicating the business problem(s). Otherwise, the command handler will then publish one or several events to the event bus. The number of events depends on the use cases the internal data model granularity. In our scenario, we will assume the publication of a single FlightCreated event.

The first component triggered on this event is the domain handler. This component is responsible to update the domain aggregates depending on the logic implemented. In general, the logic is delegated to the aggregate root (acting as a facade but which may also delegate the underlying logic to the subdomain objects). Bear in mind, the aggregate must always remain consistent and must also enforce the data integrity by validating the invariants.

If the handler succeeds (no business error raised), the event is persisted in the event store and the cache is updated with the latest aggregate instance.

Then, the view handlers are triggered to update their corresponding view. Just like in a plain old publish-subscribe pattern, a view can subscribe only in the events it is interested in. Maybe in our case, the view 2 is the only one interested in the FlightCreated event.

Aggregate update

The second scenario is the update of an existing aggregate. Upon UpdateFlight command reception, the command handler asks the repository to return the latest aggregate instance (if any).

If the instance is cached, it is not required to interact with the event store. Otherwise, the repository will trigger the so-called rehydration process.

This process is a way to compute the current state of an aggregate instance according to the sequence of events stored. Each event retrieved in the event store (let’s say FlightCreated, DepartureUpdated and ArrivalUpdated) is published in the event bus. The first domain handler triggered with FlightCreated instantiates a new aggregate (creates a new object instance in memory based on the information coming from the event itself). Then the other domain handlers (triggered with DepartureUpdated and ArrivalUpdated events) will update the aggregate instance freshly created. In the end, we are able to compute the state according to the events stored.

Once the state is computed, the object instance is put in the cache and returned to the command handler. Then, the rest of the process is the same than in the aggregate creation scenario.

One thing to add regarding the rehydration process. What if an aggregate is not cached and that we have 1000 events stored for one particular aggregate instance? Obviously, it would take a very long time to compute its state. There is a known mitigation called snapshot.

We can decide to persist every n events the current state of the aggregate as a snapshot. This snapshot would also contain the position in the event store. Then, the rehydration process would simply start with the latest snapshot and would continue from the position indicated. A snapshot could also be created based on other policy types (if the rehydration time exceeds a certain threshold etc.).

How to handle Events?

I would like to come back on the distinction we have made between command and events. First, it’s worth to distinguish between internal and external events. An external event is produced by another application whereas an internal one is produced by our application (based on an external command).

We had an interesting debate on how to technically manage external events arriving in our application. I mean a real event in the sense where that’s something which already happened in the past (like a radar track).

There are indeed two possible approaches:

  • The first approach is to consider an event as a command. This means we have to pass first by a command handler, validate the invariants and then produce an internal event.
  • The second approach is to bypass the command handler and to directly persist it the event in the event store. After all, if we speak about a real event, it’s actually pretty useless to validate the invariants etc. Yet, it’s still important to check the event syntax to make sure we will not pollute the event store.

If we go with the second option it may be interesting to implement rules during the aggregate rehydration.

Let’s take the example of a radar track publishing flight positions. In case of the producer cannot guarantee the ordering of the messages, we can also persist a timestamp (generated by the producer) and compute the state this way:

if event.date > latestEventDate {
  // Compute the state
latestEventDate = event.date
} else {
  // Discard the event
}

This rule would ensure the state is only based on the latest event produced. It means persisting an event would not necessarily mean impacting the current state.

In the first approach, such rule would be implemented before persisting the event.

Event model

Is it necessary to create one single model for the events persisted in an event store? In my opinion, the answer is no (most of the times at least).

First, because we may want to persist different model versions across the time. In this case, we have to implement a strategy to map an event from one model version to another.

There is another benefit I would like to illustrate with a concrete example. Let’s consider an application receiving events from a system A and a system B. Both systems are publishing flight events based on their own data models. If we create a common data model C, we need to translate A to C and B to C before persisting the events. Yet, at some point in the project, we are only interested in some of the information from A and B. It means C is just a subset of A and B.

But what if later on, we need to make some evolutions to our application and to manage additional elements from A and B? Because the events are persisted using the C format, those elements are simply lost. On the other side, if we had decided to persist A and B format, we could simply have made some evolutions to the command handler to manage those elements.

Eventual consistency

Theory

Eventual consistency is a concept brought by CQRS (most of the times). It is important to understand the consequences and the impacts.

First, it’s worth saying there are different consistency levels.

Eventual consistency is model where we can ensure a data will be replicated (from the write to the read part of a CQRS application). The problem is we can’t exactly guarantee when. It is going to be impacted by various factors like the overall throughput, the network latency etc. This is the weakest form of consistency but it offers the lowest latency.

Applying eventual consistency on a CQRS application means at some point the write part may be desynchronized from the read part.

On the opposite, we find the strong consistency model. Unless we are using the same database for managing read and writes or we sold our soul to the devil by using two-phase commit, we should not be able to reach this consistency level in a distributed system.

The closest implementation, in case we have two different databases, is to manage everything within a single thread. This thread would be in charge to persist a data on the write database and on the read database(s). A thread can also be dedicated to one single aggregate instance and manage incoming commands sequentially. Yet, what would be the impacts in case of a transient error while synchronizing a view? Do we need to compensate the other views and the write part of the CQRS application? Do we need to implement a retry-on-error loop? Do we need to apply the circuit breaker pattern by pausing the command handler to stop new incoming events? It’s important to tackle the transient errors which are obviously going to occur (anything that can go wrong will go wrong).

Between both consistency models (eventual and strong consistency), we can find many different models: causal consistency, sequential consistency etc. As an example, the client monotonic consistency model guarantees a strong consistency only per session (an application or a service instance). Therefore, implementing a CQRS application is not simply a choice between eventual and strong consistency.

My opinion is the following. As we can hardly guarantee to have a strong consistency, let’s embrace eventual consistency as much as possible. Yet, the prerequisite is to understanding precisely the impacts on the rest of the system.

Example

Let’s see one concrete example I faced on my project.

One of the challenges was to manage a unique identifier for each flight. We had to deal with events coming from external systems (external to the company) where the identifier was not shared. For one channel the identifier was a composite (departure airport + departure time + aircraft identifier +arrival airport) while another other channel was sending a unique identifier per flight (but not known by the first channel). The goal was to manage our own unique identifier (called GUFI for Globally Unique Flight Identifier) and to make sure each event was corresponding to the correct GUFI.

The simplest solution is to make sure that each incoming event has made a lookup in a particular view of our application to associate the corresponding GUFI. But what if this view is eventually consistent? In the worst case, we may have events related to the same flight but stored with different GUFIs (which believe me was a problem).

One solution may be to delegate the management of this GUFI to another service, strongly consistent.

Another solution was provided by Greg Young during a Q/A session. We can implement a sort of buffer containing only the n last events handled by our application. If the view does not contain the data we are looking for, we have to check in this buffer to make sure it was not received just before. The bigger n is, the better the chances to mitigate this inconsistency window between the write and the read site.

This buffer can be either distributed using a solution like Hazelcast, Redis etc. or it can also be local to the application instance. In the latter case, we may have to implement a sharding mechanism to distribute the events related to the same object always towards the same application instance using a hashing function for example (preferably a consistent hashing function to scale-out easily).

Concurrency management

I already created a post a few months ago to describe the benefits of using Event Sourcing to manage concurrent updates.

In a nutshell, having an event store may help us in having a smart solution to deal with concurrent updates compared to a pessimistic or optimistic one.

Also, applying a correct level of granularity in the data model is also a key to a successful project.

Choosing an event store

We can decide to use any sort of databases to persist a sequence of events. Yet, the most optimal solution is very often a solution built for Event Sourcing.

For example, isolating an aggregate instance is something which must be considered. Let’s imagine all the events are persisted in one single table. This table will keep growing over the time and during the aggregate rehydration we will have to filter out the events related to one specific aggregate instance. The time to rehydrate an aggregate will depend on the total number of events persisted, even though some of them are not linked to the instance we are interested in. A good solution may be to have one table/bucket per aggregate instance to isolate the events. We will call this concept a stream. A stream is always linked to one aggregate instance (in most of the use cases).

Here are the requirements we have considered to choose an event store:

Write:

  • Constant write latency: regardless of a stream size, the latency to persist an event must remain constant
  • Atomicity: multiple events can be appended in a single transaction
  • TTL management: automatically discard events based on their creation date
  • Schemaless: capacity to store multiple event types and versions

Read:

  • Read back events in the written order
  • Read from a specific sequence number (because of the snapshots)
  • Constant read performance in a given stream, regardless of the other streams
  • HMI
  • Caching management

Concurrency:

  • Optimistic concurrency model
  • Idempotence management

Product monitoring

Solution support

Security:

  • Encryption (transport)
  • Authentication
  • Authorization management

Scaling

Backup

As every context is unique I’m pretty sure you will have your own requirements but at least it might be a starting point.


Conclusion

There’s no magic behind CQRS and Event Sourcing. Before starting your journey it is crucial to understand the many impacts of the two patterns. Otherwise, it is incredibly easy to create a complete mess both on the technical and the functional level.

Yet, once you get a precise idea of the constraints and the drawbacks, CQRS and/or Event Sourcing can be a serious solution to many problems.

Further reading

Like what you read? Give Teiva Harsanyi a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.