Hello CQRS + ES !

Jay Rajani
Blue Harvest Tech Blog
6 min readApr 12, 2019

In my previous article CQRS ❤️ Event Sourcing, I explained the terminology about CQRS and Event Sourcing. However, it did not include any practical example. I know I took a bit of time to write another but let’s start implementing one with a simple domain.

We will develop an API to create a conference and sell the tickets for the conference. To create the conference, the conference creator has to provide the number of seats available to be booked. It is not allowed to create the conference without the number of available seats. Once the conference is created, a user can buy a ticket to obtain one or more seats. We cannot overbook the conference. That’s it; quite a simple domain.

If I break the entire exercise in four steps, it would look like:

  1. Define the aggregates & command model
  2. Make projections for consolidated view
  3. Define the query model
  4. Create the API: putting all the pieces together

I will use Axon framework, Spring boot, JPA and Kotlin for programming and a bit of domain driven design for modelling. If you like to download the source code first and use your favourite IDE while reading, you can download it from this public repo.

The Command Model

An aggregate is an entity or a collection of entities that can be grouped together. It is the main building block of a domain driven design. It requires thorough understanding of the domain to define the aggregates properly. For simplicity, we will create two entities in our application: Conference and Ticket. As we need a conference before selling the ticket, the conference becomes the Aggregate Root.

But remember Uncle Bob!! Before writing any production code, we have to write the test. I love the three TDD principles. If it was a live coding session, I could have switched between Red, Green and Blue.

I have two tests scenarios. If a conference is created with x number of available seats, I expect a ConferenceCreatedEvent. But If we try to create the conference without available seats, it would fail. The actual business validation can be even complex.

Tests for Conference Aggregate

Axon Framework provides a test fixture to perform “given-when-then” testing. Typically, it looks like:

  • Given certain events occurred in the past
  • When executing this command
  • Expect these events to be published

As in TDD, we target one class,

The fixture is meant to test only one aggregate.

Hence all the commands are meant to target the aggregate under the test.

Now we will create ConferenceAggregate. An aggregate holds a state. From the implementation point of view, it has two responsibilities: Business Validation and State change. We want the state change to be persisted as a series of events, we will write EventSourcingHandler.

Conference Aggregate

Every aggregate must have an identifier, annotated with @AggregateIdentifier. All the write and read operations on this aggregate have to be carried out with commands and queries respectively. Hence, we will create a few POJOs. I have used Kotlin as I can put all the POJOs related to command in one file.

With CQRS, only the command model can change the state.

The Command Model — Conference Aggregate

In order to inform the application that this command belongs to which specific conference, we have a property that contains the identifier of the target aggregate, annotated with @TargetAggregateIdentifier.

Now my simple Conference aggregate is ready with the tests. But we only created the conference and it is not enough. We also need to sell tickets. So, it is time to write our second entity.

Wait a minute, we are also following DDD principles here. Ticket can be a separate Aggregate but if we delete Conference, Tickets has no meaning. We can make Ticket a member of Conference Aggregate.

The member entity is only allowed to be accessed via the Aggregate Root

First, we will update our tests.

ConferenceTest with Tickets

Please note that we should have an existing conference to test tickets. Hence, I have an existing event in given conditions.

We can go ahead updating the aggregate now.

Conference Aggregate with Tickets

We have one more command and event defined as following.

The Command Model — Ticket

and the ticket entity looks like

Ticket Entity

Our command model looks complete now.

The Projections

So far, we created the aggregate and stored the state as a sequence of events. However, when we have to read them, we want a consolidated view of the actual state. That can be achieved by creating the projections. In our case, the projection is a table in H2 database which we will update whenever an event is triggered. We will write the event handlers which will listen to the events.

  1. Create the classes for projections: JPA Entities
  2. Create the repositories
  3. Write the event handlers
  4. Configure the event handlers

I have defined the projections in Kotlin as follows.

Entities for projections

They are both JPA entities. I did not make any association from Conference to Ticket in the projections just to keep the code a bit simple. The next step is to generate JpaRepositories for both the JPA entities. They are simple interfaces and hence can skip the code snippet. We will have to write two event handlers, one for the create conference event and second for the sell ticket event.

So far, we defined the aggregate, defined our command model and also projected them in the database. The missing part is the read operations.

The Query Model

An aggregate is a POJO, but as per CQRS principles, the state should not be exposed via the accessor methods. We will create a separate model to read the data.

For these queries, we will also need handlers. These handlers are methods like EventHandlers but annotated with @QueryHandler. Axon framework will automatically identify them.

The Query Handlers

Putting all the pieces together

Until now we were using the test fixture to test, but we have to also expose the APIs. Once we have created our endpoint, we can use Query Gateway and Command Gateway provided by Axon. We can inject them directly as they will be created by auto configuration.

I have used Axon server which provides smart routing of events, commands and queries in a distributed environment. Axon server is distributed as a jar. Axon offers both community and enterprise edition. You can download the community edition from Axon site. You can start Axon server by using following command:

java -jar axonserver-4.0.jar

In order to connect to Axon Server, we only need to include Axon auto configure dependency and a property in application.properties.

axon.axonserver.servers=${AXON_SERVER_HOST}

Once we have the Axon Server running, we can go ahead running the application as a spring boot application using

mvn spring-boot:run

You can also open the Axon server dashboard to take a look. It has an inbuilt event store to store all the events triggered. Axon server offers a lot of features to make the life easier, you can get a glance from these screenshots.

This was the simplest application, so we did not put more emphasis on other crucial aspects like transaction management. But don’t worry ! We will take those examples as well. Until then, enjoy playing with event sourcing and CQRS. Let me know by replying with a comment what your first impression about CQRS + ES is.

--

--