Demonstrating Event Sourcing and CQRS

Yitian Zou
The Startup
Published in
10 min readDec 31, 2019

In the summer of 2017, I interned at the SAP Tools Team in Palo Alto, CA. My project there was to build a meeting feedback application on top of Microsoft Azure, using the concepts of event sourcing and CQRS. Here I detail the technical details of that project.

The Concept

Every day, SAP employees hustle in and out of many meetings. Doing a quick calculation — with 85,000 employees and each one spending 2 hours a day in meetings, we get 42,500,000 person-hours per year. This number may be a rough estimate, but gives us an idea of how much of an investment meetings are for large companies.

Given the magnitude of this investment, it’s worth investigating if these meetings hours are well spent.While getting an objective measure like ROI (Return on Investment) may be difficult in a large organization, we can get a subjective measure. Recently, the SAP Tools Team has released an application that gathers the feedback of participants on a 5-star scale. This aggregated data is then shared with both the meeting organizers and the company.

Workflow for organizer capturing meeting feedback from attendees.

Event Sourcing and CQRS

In a classic CRUD (Create, Read, Update, Delete) model, we only persist the current state — no information about how we arrived at the state is kept. This can be a problem in domains where we want to keep a log of every transaction in history. Also, the data layer in the CRUD model is responsible for both reading data from store and updating the data. In a read/write-heavy application, we could face problems with data contention and conflicts caused by concurrent updates.

Unlike in CRUD, where the same entity reads and writes data, CQRS (Command Query Responsibility Separation) introduces a separation of concern, using different data models for reading and writing. Here, we have a clear distinction between commands, which change state, and queries, which return data.

While commands express an intent to change the state, events represent changes that have already occurred. In event sourcing, instead of keeping track of the current state, we keep an ordered log of all the events that have happened. Events are immutable objects that can only be appended to an event stream, where other services can consume them as soon as they are persisted. With this model, we can start from an initial state and “replay” through the log, applying each change in sequence until we get to the present state.

Comparison of CRUD and CQRS Event Sourcing.

Let’s see how event sourcing and CQRS work together. Incoming commands are transformed into events by a handler that deals with permissions and validations, and the resulting events are persisted to a data store. Since the store only appends immutable events, we avoid the update conflicts that occur in the CRUD model when multiple actors try to concurrently modify the state. With a centralized log that serves as our immutable source of truth, other services and dependencies can read and write at their own pace, allowing the production and consumption of events to be decoupled. Now the services reading from and listening to the event stream can be optimized independently. Also, failures or changes in latency in one part of the application don’t affect the rest of the application. Since the logic for listening to the event stream is implemented in each service, we can insert or replace services and maintain modularity. With event sourcing and CQRS implemented, we can recreate the state of the application at any point in history; in a domain where it makes sense to keep a log of transactions, this can be very useful for testing and reproducing bugs.

Event sourcing and functional programming

In order to implement event sourcing, we need to leverage the key ideas of immutability and composability. As a functional programming language, F# fits in well with this event sourcing architecture, as it emphasizes functions free of side effects. Instead of keeping track of ever-changing global variables, we compose together pure function calls to recreate the state. Since functions depend only on input arguments and not on context, an entire class of bugs involving unexpected mutations in data disappears. Functions can also be replaced and rewritten easily, leading to better modularization. Because F# expects immutable bindings, there is no concept of a special “null” value. As we’ll see later, there are ways to wrap nullable values so that their behavior is predictable.

Microsoft Azure functions

To abstract away the complexities of managing the infrastructure, we chose to implement our application through the serverless architecture of Microsoft Azure functions. With the runtime and scaling fully managed by a third-party service, we can focus on writing the functions. Azure also provides bindings to service buses, data stores, HTTP triggers, and other common ways of connecting with input and output data.

The Implementation: Building Blocks

First, we’ll introduce the underlying state machine of our application. From an initial state, the state moves to the Requested state upon a Requested event. Here votes are collected until everyone has voted or the request has timed out, upon which the state shifts to Collected.

State machine of events.

The state also keeps track of all the necessary meeting information and voting data. Other than state, our implementation also uses two other core types: commands and events. As previously mentioned, commands are present-tense verbs expressing desire, while events are corresponding objects that have already happened. All of our types make use of F#’s discriminated union, which enforces that the data will be one of the distinct options. For example, events must fall into one of the following: Requested, Voted, Abstained, or TimedOut.

type Event =  | Requested of organizer : string * attendees : string list * subj : string * sTime : DateTime * eTime : DateTime  | Voted of attendee : string * rating : int  | Abstained of attendee : string  | TimedOut

The exec function (State -> Commd -> Choice<Event, string>) validates commands before transforming them into events. In order to perform validations, we make use of the Choice type, where Choice1Of2 indicates a valid event and Choice2Of2 indicates an error, which can be annotated with a message. With pattern matching, all the possible cases of states and commands are matched to a different expression block (not all cases are shown).

let exec (state : State) (commd: Commd) : Choice<Event, string>  = match (state, commd) with    | (State.Requested request, Vote (attendee, rating)) ->    match request.attendees.Contains attendee with      | false -> Choice2Of2 "Voter not in list of attendees"      | true ->        match (voters request.feedback request.refusal).Contains     attendee with          | false ->
Choice1Of2 (Voted (attendee, rating))
| true -> Choice2Of2 "Attendee has already voted"

Now that we have a valid event, we can propagate the state change through the appl function (State -> Event -> State), which takes in a (state, event) pair and outputs what the successor state should be, according to our previous state machine diagram. To deal with attendees who didn’t attend the meeting, there is also an Abstained event — once everyone has either voted or abstained, or a timeout occurs, the state moves to Collected. We also keep track of our list of feedback received in the state.

let appl (state: State) (event : Event) : State  = match (state, event) with    | (State.Requested request, Abstained (attendee)) ->      match ((attendee::request.refusal).Length + request.feedback.Length) = request.attendees.Length with        | true ->          State.Collected { request with refusal = (attendee::request.refusal) }        | false ->          State.Requested { request with refusal = (attendee::request.refusal) }

Tying everything together is the play function — given a history of all the past events and an incoming command, we want to recreate the current state and append the new event (if valid) onto the event stream. Key to this function is the List.fold function, which composes together a series of state changes. It applies the first event in the history to the initial state, then applies the second event to the resulting state, and so on. This produces an elegantly composed series of state changes that eventually gives us the current state. Now we can do some pattern matching to execute the command into an event and make the necessary changes to the event stream.

let play (init : 'S, exec: 'S -> 'C -> Choice<'E, 'F>, appl : 'S -> 'E -> 'S) (history : 'E list) (commd : 'C) : 'E list * Choice<'S * 'E, 'F> =  let state = List.fold appl init history  match (exec state commd) with    | Choice1Of2 event ->      history@[event], Choice1Of2(appl state event, event)    | Choice2Of2 f ->      history, Choice2Of2(f)

We also need downstream event handlers that respond accordingly to specific events. Upon the broadcast of a <State, Event> object by the service bus, the following function detects if the state has shifted to Collected, and sends an email to the organizer with the collected voting information.

let Run(res : Res<State, Event>, template : string, mail : ICollector<Mail>, log: TraceWriter) =  match (res.state, res.event) with    | State.Collected request, Event.Voted _    | State.Collected request, Event.Abstained _    | State.Collected request, Event.TimedOut ->      mail.Add(makeMail request.organizer template)    | _ ->      ()

The Implementation: Core Functionality

With the business logic in place, let’s connect it to the other modules we need.

High-level view of all application modules.

The starting point of our application is the Timer EWS, which is automatically called every 5 minutes. It connects to the calendar through Microsoft Graph API and searches for meetings that have just ended. For each meeting where feedback was requested, a new command is issued and placed onto the command queue (Ct). This part is accomplished with an Azure Service Bus, which uses a publish-and-subscribe mechanism to alert subscribers of newly raised commands on the queue. Using Microsoft Azure functions, connecting to the Service Bus is a simple matter of adding an output binding:

"type": "serviceBus","name": "commd","queueName": "meeting-request","connection": "EVENT-QUEUE","accessRights_": "Manage","direction": "out"

We see that commands raised on the queue are received by the play function, which retrieves the old event stream from the Cosmos DB, recreates the current state, and executes the command to produce a new event stream, with which it updates the store. In addition to persisting the event store, we also want to raise the new event so that the listeners are notified. This time, since we want to support multiple subscribers, we will be broadcasting onto an event topic instead of a queue.

There are two event handlers that handle sending emails upon the appropriate events. Upon a Requested event, the email handler sends an email to all attendees containing the function URLs for voting 1 through 5 stars, including an option for Didn’t Attend. The vote function is set up with an HTTP trigger so that it adds the correct command to the queue for each vote coming in. Here again, the vote commands are processed by the play function and the event store is updated.

Even if not everybody responds with feedback, we need to close a meeting feedback instance eventually. To implement time out, we need a query model so that we can check for meetings that are past their voting deadline. Using an Azure Storage Table, we can store just the fields that we need for this purpose, namely the meeting ID and deadline. This way, the Timer Timeout function can easily query for meetings where the deadline is in the past, and raise a TimeOut command on the queue.

As it stands, the application only returns the number of votes and average rating to the organizer. But we might also want other information, like how the rating compares to other employers and other helpful data. However, the query model we set up for timeout isn’t quite sufficient to handle this type of data analytics. Instead of consolidating everything into an overly hefty data store, we can implement a separate analytics model. Much like the read model, it will also consume events and update its database. Because these models have different needs and can be optimized differently, it follows CQRS philosophy to have the data available in different formats in different data stores.

Conclusion

The application of meeting request feedback lends itself naturally to the event sourcing and CQRS architecture. For the SAP Tools Team, the concepts of keeping a history and decoupling requests from responses proved an interesting alternative to CRUD that took some time to understand. Discriminated union and pattern matching in F# were very elegant constructs, although thinking functionally around problems that would have typically been solved with if-else and loop logic was a struggle. We were pleasantly surprised to find that once compiler issues are resolved, the code had a very high likelihood of working. Using small modular functions on Azure meant that they were easy to test and implement individually, though that came with the extra burden of having to keep track of many moving parts, as it was easy to forget which functions were connected to which bindings. That said, the bindings themselves are very easy to work with; since they only require adding some json, it wasn’t necessary to have a deep understanding of how service busses and other services were implemented. The event stream allowed for straightforward testing that involved checking that the correct event was being appended.

Here I conclude my first Medium blog post. Happy New Year, and thanks to Dominik Tornow for being my first (and best) technical mentor!

--

--