ES/CQRS the Akka way

Krunoslav Uzelac
Jun 23 · 9 min read

Requirements

Create a note service application that will one day be able to scale horizontally while being stateful. Client will be able to create/read/update/delete notes via HTTP API.

Solution design

Several trends in computing have surfaced in recent years: big data, containers, serverless application, microservices, and event-driven architecture (EDA). The popularity has grown because companies now know they can move faster and deliver scalable solutions that are much more manageable than monolithic applications. To accomplish these objectives several patterns have emerged.

Command/Query Responsibility Segregation (CQRS) creates a logical separation between operations that mutate data and operations that merely retrieve it. Because the load characteristics of reading and updating data are often different, this practice can have a profound impact on how well your application runs.

In addition, Event Sourcing (ES) can help your application collect data that may not be considered valuable at the time of design but may be very valuable later. Event sourcing is a way of modelling your data as the result of a sequence of events rather than only keeping track of current state.

“It’s like your bank account. Current balance can be recreated by replaying all of your transactions.”

Command Query Responsibility Segregation and Event Sourcing are both advanced development concepts. Like many advanced concepts, it can be difficult to get your head around their purpose. However, once you understand their purpose, you will have a good grasp of what is going on. We need to argue the other side as well with some drawbacks mentioned in this blog.

Why we love it?

Having an audit trail of changes which we can always replay and reuse

CQRS gives us a lot more flexibility on query side than the usual approaches

Allows us to build horizontally scalable applications

Technology

Finding the right technology that natively supports and embraces ES/CQRS is not an easy task. There are plenty of libraries and frameworks our there like Axon, Eventuate, Aecore, Lagom, Akka to name a few. They are all different flavours of the same pattern. Our selection was to use Akka because of its actor model which fits very nicely with ES and it’s fairly low level so we have plenty of flexibility . Watch out, with great power comes great responsibility!

Storage

In order to support fast and scalable writes as our event store we’re using Apache Cassandra. Cassandra is from a family of NoSql databases which has couple of guarantees: high throughput, resiliency and horizontal scalability like seen in these benchmarks. On the query side we can use a more common relational database like Postgres.

Deep dive into the solution

Let’s see how would the architecture look like on Diagram 1. We’ll split it into 3 main areas and then go into details:

  1. Command side
  2. Query side synchronisation mechanism
  3. Query side

1. Command side — Reacting on incoming commands

We modelled our system using DDD (Domain driven design) into a single aggregate. In our simplified case that will be the note aggregate. Aggregates are a pattern in which we can treat our list of notes as a single object.

In Akkas actor model we’ll keep the list of notes in a single actor called NoteAggregate. By using plain actors our state would be lost after the application redeploy because state (list of notes) is kept in memory only. That pushes us into the direction where we need to preserve the state someway.

Ideal scenario would be that upon arrival of a new command CreateNote we would validate it and persist an event (NoteCreated) which represents the change of our state. After we persisted the event into the event store which is Cassandra we need to decide if we need this event in our aggregate in-memory state as well or it’s good enough that we just persisted it into event store. It is imperative to store it into the memory in order to validate new commands that are coming to the aggregate.

After awhile we will notice that the in-memory state is becoming too big using this version of the application but there is a way to easily shard the data using cluster sharding from Akka. In the next blog post we’ll make this application cluster ready.

When the aggregate gets initialised it will replay its previous events from the event store. New commands will wait in the actor mailbox until all the previous events are handled and the in-memory state is up to date. Let’s say that we get a new requirement which says that in our note service subject of a note has to be unique. We would be required to have a list of subjects as our aggregate in-memory state which we’ll use to validate the uniqueness of newly arrived CreateNote/UpdateNote commands. This could be further optimised to just have a hashcode of titles in order to minimise the memory consumption.

So what happens then when the application restarts? We would need to reply all the events that we had persisted before and rebuild the state the same way we built it when we were receiving the commands.

Good thing we have an awesome library like Akka persistence which handles the persistence part within an actor. We make NoteAggregate extend PersistentActor instead of regular Actor trait and magic begins. There are 3 things we need to define:

  1. PersistenceId. That’s a key which differentiates events of one aggregate from others. Our persistenceId will be NoteAggregate.
  2. receiveRecover is a function that will be called during the recovery of the state upon initialisation. All the events that this aggregate saved will be replayed one by one. That way we can rebuild its previous state. Receiving an event RecoveryCompleted will mark the end of the reply process.
  3. receiveCommand is the function that takes over after receiveRecover is done. All the new commands are handled here.

2. Command side - Query side synchronisation mechanism

Synchronising event store and view store/read side database seems like a fairly straightforward task but doing it efficiently requires some additional work. In order to keep the view data store updated as efficiently and quickly as possible, we should try to avoid re-reading all the events from the store every time we have to refresh the view. Replaying 2 million events on every restart of the application is simply not good enough. So how do we do it more efficiently? Saving the offset of last read event will solve this issue. We just need to build a mechanism which can query the event store and start from a specified offset.

Akka persistence is giving us the functionality to replay the events from events store even when we’re not in the aggregate. During the boot of our application we read what was the last saved offset and using this information we can start the eventsByPersistenceId stream source. This will allow us to read all the events that came after our specified offset and to continuously replay new events as they get saved during the life of the application. It’s up to us now to decide what are we going to do with every event that is replayed. Usually we will remap that event to our read side model and persist it into the database. After the event is persisted we would need to persist the offset of the event as well just to avoid replaying the same event multiple times.

Our view schema should be idempotent. Why? Because it can always happen that we wrote the event to the view and were going to store the offset but our application crashed. In that case we will use the previously saved offset and reread the same event again. If we’re having actions like incrementing some counter that could be a big issue.

With this approach we can have multiple consumers of the same event stream. Each consumer will have a different id which will allow it to keep it’s own offset. One beautiful thing about it is the fact that each stream is consuming the events at its own pace. Usually the different sources have different capabilities of consuming data.

We could, for example, have a stream which stores the events into the database and another instance of the stream which publishes these events into Kafka. If for whatever reason some of the consumers cannot handle data (it’s down, disconnected, bug…) the offset number for that consumer won’t move on while another stream continues normally. When the issue is resolved the stream will continue where it left of and no data is lost. Only problem that we see here is the stale data until the consumer consumes all the events.

3. Query side — Processing of incoming queries

Usually the query/read side is more old school fetching information directly from a database or cache. One big difference here compared to traditional systems is the way the data is organised. Using the event store as the source of truth, the query side builds its state/schema in a way to be as performant as possible in responding to the queries. Data structures and table design is primarily optimised for reading as writing is only happening from the view builder mechanism.

If a relational database isn’t a good fit we can easily move to a more appropriate one. For example, if we need to provide facets for searching with counts of items per facet, Elasticsearch would potentially be a better fit than a relational database.

Our top 5 advices:

  1. No sending events directly from aggregate to view model, read it from event store

Always think about the fact that your system can stop or crash anytime for range of reasons. Some of them are completely out of your control: datacenter shutdowns, hardware failure etc etc. If we’re sending the event manually at the same time as when we’re persisting it to be “faster and more efficient” we’ll end up in a situation where we’ll store the event into the event store but application crashed before it managed to save the event into the view store. Which means your application is now in inconsistent state which is hard to resolve and detect.

2. No reading of view model from command side

It’s tempting to use the view store directly from the command side, like in the example we mentioned with having a validation rule where note name has to be unique. There are two main drawbacks straight away.

2.1. Coupling of command and query side.

2.2. Our view side is eventually consistent which means when a second note with identical name comes, it can happen that there is already a note with that name in the event store but it hasn’t yet been replayed into the view store. Thus , you will end up with two notes with the same name. Only aggregate can offer you consistent view on your actual state.

Simplest way to avoid this issue is to split the application in two parts as a separate deployment components.

3. Have a way to “redo” the view model completely from event store

View gets changed much more often than the command side. More details need to be shown, new index needs to be added/removed, changes in design… In order for the change to be less painful you can easily just drop the whole view and replay the events from the event store again.

4. Eventual consistency on View — what it means for UI

If you’ve worked with eventually consistent systems before you probably know what kind of problems we’re talking about. User creates a new note, refreshes the page but the note isn’t there. What?! Eh, it wasn’t yet replayed to the view store. There are some ways of trying to mitigate these issues and we’ll write about them in some future blog posts.

5. Schema evolution before going to production

Requirements and applications change over time. Instead of trying to fight it we should just accept that and find patterns that embrace those changes. Events that we store evolve over time as well so we need to have a way to support it. For example our note now needs to save the color of the note as well. Simplest solution would be to add new field but what about all the events before that didn’t have a color field? There has to be a way to evolve from v1 of the event to v2. There will be future blog posts completely dedicated to this subject.

Summary

We’ve gone through a simplified example end to end using Event sourcing and CQRS in order to demonstrate how clean and easy this pattern can be.

Code containing integration tests with embedded Cassandra and H2 db are available at https://github.com/ingenuiq/es-cqrs-akka-full-example. Any suggestions or feedback is more than welcome.

Thank you for sticking with us till the and be ready! New content is on the way!