In this post, I have presented the project structured using Event Sourcing and CQRS patterns, written in TypeScript.
Why use CQRS/Event Sourcing?
The project I was building was a “Twitter-like” service where instead of sending tweets, users would rate websites using a chrome extension.
In terms of database, I wanted to support the following queries:
- list all ratings per link
- list all ratings per user
- get (aggregated) user feed per user (since users can follow each other)
I’ve soon realized that what I needed wasn’t a single database model, but the ability to use multiple tools for different jobs.
Even though supporting these queries is not easy, when looking from a business domain point of view, the app itself is simple: Users can rate links and follow each other. Everything else happens as a result of those actions.
In event sourcing, that information is all you need.
To illustrate the point, here is a list of event types I used:
Each of those events contains additional data like
linkTitle but still, they are very easy to reason about.
Events about ratings are saved using
linkId as a partition key (generated from
linkUrl). Since for each
linkId only a few events should be saved, “listing all ratings per user” query can easily be supported by “reducing” those events.
For aggregated user feeds, I’m using an event handler that listens to “LINK_RATED”, “USER_FOLLOWED” and “USER_UNFOLLOWED” events and then saving appropriate data using a “getStream” API.
For “listing all ratings per link”, a second event handler updates a DynamoDB table with a
linkUrl as a partition key.
If later on, I change my mind and realize that a different kind of database or a SAAS service is more suitable, I can easily make the switch by leveraging events in a different way, at any point in time.
The project architecture is divided into two parts: command and query (CQRS).
The command side is invoked every time something needs to change in the app (like rating a website or updating/deleting previously rated one). It is responsible for validating business rules and saving results in the form of events (LINK_RATED, USER_FOLLOWED etc).
The query side reacts to those events and updates a database which is then used to support various queries.
Even though events are often stored in a relational database, I found it easier to reason about them by using concepts of a NoSQL model.
For example, in MongoDB, documents are objects stored with an
_id field that is used as a primary key. In a similar way, events can be stored in the event store, but, instead of an object, you can think of a document as an array of events.
And just as there are countless ways of deciding what constitutes a document in MongoDB, there are countless ways of deciding how those arrays of events should be grouped. It all depends on how you decide to “design aggregate boundaries”.
For this project, events are categorized in the following manner:
But, before saving event(s) in a database, usually, some kind of validations must be made, and in most cases that can only be done by making conditions against previously stored data.
For example, in order to save “USER_FOLLOWED” event, there is a condition that the same user cannot be followed twice. To uphold this, I’m checking if user’s id is listed in an array of currently followed users:
However, since this kind of array is not stored anywhere, it must first be created.
This is done by retrieving all events for a certain user (A5 in Fig. 3) and then passing them into a “reducer” where in case of “USER_FOLLOWED” event, a
userId is added in an array, and in the case of “USER_UNFOLLOWED”, it is removed:
After all business rules are satisfied, all that is left is saving event(s) in the event store (A6 in Fig. 3):
Optimistic Concurrency Control
Apart from specifying event data and a
streamId, as you can see, I’ve also included the
expectedVersion property. This is a form of optimistic concurrency control and in this case, it prevents saving multiple events for the same aggregate, at the same time.
For example, imagine that a
followUser command is simultaneously invoked two times. Since in both cases, the identical array would probably be created (from a history of, say, 10 events), it would come to a point of saving an identical result for both commands:
The problem is, if in both cases, a “USER_FOLLOWED” event is stored, it will break the rule of not following the same user multiple times.
expectedVersion: 10 is specified, only one of these commands will succeed, because, by the time a second one is executed, 11 events will already be stored in a database.
Because every time a command is invoked, all events for a specific
streamId (in this case —
userId) are retrieved, you may be wondering, what happens if a user has thousands of events stored?
In that case, I’m using snapshots.
The idea is to pass previously stored state into a reducer, and apply only new events which happened after the snapshot was created:
But, this example is simplified. Snapshots are an optimization technique that comes with a price: versioning, inability to reuse events for additional reducers, updating snapshots for new events and additional requests to a database.
Here is how I’m actually using it (a snippet from another project):
Some of the complexity is hidden behind a
getByIdUsingSnapshot() function where for every 1000 events, a new snapshot is created in AWS S3. In a snapshot itself (a JSON file) there is also a version included which is used to determine the offset used in getting new events from the event store.
As you can see, snapshots are useful but also introduce additional complexity. It’s good to avoid them if retrieving all events for an aggregate is not too expensive (which it often isn’t).
Note: Sometimes, I’m using snapshots on the query side. This means that the same reducer is shared between a command and a query side.
Needless to say, one should be careful when doing this, but for me, it saved a lot of time, especially in the early stages when a model was evolving more rapidly. Later on, if it proves to be too expensive, it’s not hard to switch to a “proper” read model.
DynamoDB Event Store
I love DynamoDB because it’s fully managed and highly scalable, but it’s not often used as an event store. Reasons for that are issues related to consistency, ordering, and transactions.
In an example of invoking two commands simultaneously, I’ve already described how not maintaining consistency can result in invalid data stored in a database.
In DynamoDB, this can be solved with Conditional writes. Since
expectedVersion is a required property of the
save() function, to store events in a database, there is a condition that the specified version must not already exist (for specific aggregate):
When data is stored in DynamoDB, it’s replicated in three copies and a write request is acknowledged only after two copies are updated.
Because of that, there are two options in which items can be retrieved from a database: using a “strong consistency” or “eventual consistency”.
Since a version number is determined when events are retrieved from a database, with “eventual consistent reads” (a default and cheaper option), there is a possibility of getting outdated results. In that case, storing events can fail (with 409 error code). This is often resolved by retrying the operation until it succeeds.
If, however, a “strong consistency” option is used, two out of three DynamoDB copies are needed to retrieve a reliable result and 409 errors should only occur in case of parallel requests on the same aggregate.
Since the “eventstore“ table is created with a
streamId as a partition key and a
version as a sort key, the
getById() function always returns accurately ordered events.
But due to its “NoSQL nature”, retrieving ordered events across all aggregates in DynamoDB is not as easy as in relational databases.
My first approach to solving this was using a global secondary index (GSI) and choosing a fixed property (like
active:1) as a partition key and
timestamp as the sort key. But, that is an anti-pattern!
Even though I’m projecting only index keys, it will always use a single partition and therefore require a large throughput (high cost). Also, I’m depending on accurately storing
timestamp values which have its own problems (like sync issues between different services).
A second option is to manually store
version in a separate item, table or even different type of database every time a new event is added. This is possible with DynamoDB Streams since it “captures a time-ordered sequence of item-level modifications in a DynamoDB table and durably stores the information for up to 24 hours” (source).
Overall, even though it’s solvable, I think this is the biggest issue of using DynamoDB for an event store.
Update May 28, 2019 with the upgrades like this one using a single partition may be not such a bad thing. But I still recommend not to use it if you can avoid it or as in this case, store only index keys to keep it as small as possible.
In most cases, that array contains a single event:
But there are times a transaction is completed by storing multiple events:
Storing data like this ensures “all or none” events are saved for every command, but it also means that events must be “flatted” when retrieved.
Projections & Process Managers
After an event is stored in a database, that information must be propagated to event handlers, which are used in two ways:
- as projections — for updating additional databases or services like “getStream”
- as process managers (or “sagas”) — for side effects like sending emails or completing payments via 3rd party services
In both cases, it’s preferable to send events in a fault tolerable and reliable way in the same order in which they are stored.
If an error occurs in an event handler, it shouldn’t continue receiving new events until it’s resolved. Also, each event handler must be isolated, so an error in one handler doesn’t affect others.
For those reasons, every projection or process manager consumes events by using a FIFO (First-In-First-Out) message queue.
A message queue provides a buffer which temporarily stores messages sent by a “producer” and keeps them stored on the queue until a “consumer” retrieves it and, in the end, deletes it. In a FIFO queue, only after a message is deleted, the next one can be processed.
DynamoDB Streams to SQS
A lambda function which sends a message into an SQS queue is triggered when a new event is stored, using DynamoDB Streams.
Since it’s not advisable to use multiple lambdas connected to a DynamoDB Stream, a single lambda function forwards the event metadata into multiple SQS queues — one for each event handler (B1 in fig. 3).
Additionally, an array of events received from the stream (committed transaction) must be broken down so that for each event in an array, a separated SQS message is sent.
If an error occurs in any step of that process, DynamoDB stream will retry sending the same table item until it succeeds. This opens up a possibility of sending duplicate messages, which is why I’m also using deduplication option on each FIFO queue.
SQS to Event handlers
To get a message from an SQS queue, there must be an external service which polls it. B̶u̶t̶ ̶b̶e̶c̶a̶u̶s̶e̶ ̶L̶a̶m̶b̶d̶a̶ ̶f̶u̶n̶c̶t̶i̶o̶n̶s̶ ̶d̶o̶n̶’̶t̶ ̶s̶u̶p̶p̶o̶r̶t̶ ̶F̶I̶F̶O̶ ̶q̶u̶e̶u̶e̶s̶ ̶a̶s̶ ̶e̶v̶e̶n̶t̶ ̶s̶o̶u̶r̶c̶e̶s̶ ̶(̶a̶t̶ ̶t̶h̶e̶ ̶m̶o̶m̶e̶n̶t̶)̶,̶ ̶I̶’̶m̶ ̶u̶s̶i̶n̶g̶ ̶a̶n̶ ̶E̶C̶2̶ ̶i̶n̶s̶t̶a̶n̶c̶e̶.̶
T̶h̶a̶t̶ ̶i̶s̶ ̶t̶h̶e̶ ̶o̶n̶l̶y̶ ̶”̶n̶o̶n̶-̶s̶e̶r̶v̶e̶r̶l̶e̶s̶s̶”̶ ̶p̶a̶r̶t̶ ̶o̶f̶ ̶t̶h̶i̶s̶ ̶a̶r̶c̶h̶i̶t̶e̶c̶t̶u̶r̶e̶,̶ ̶b̶u̶t̶ ̶f̶o̶r̶t̶u̶n̶a̶t̶e̶l̶y̶,̶ ̶i̶t̶’̶s̶ ̶a̶ ̶”̶c̶h̶e̶a̶p̶”̶ ̶o̶n̶e̶.̶ ̶A̶ ̶s̶i̶n̶g̶l̶e̶ ̶m̶i̶c̶r̶o̶ ̶(̶o̶r̶ ̶e̶v̶e̶n̶ ̶n̶a̶n̶o̶)̶ ̶i̶n̶s̶t̶a̶n̶c̶e̶ ̶i̶s̶ ̶s̶u̶f̶f̶i̶c̶i̶e̶n̶t̶ ̶f̶o̶r̶ ̶a̶ ̶t̶a̶s̶k̶ ̶w̶h̶i̶c̶h̶ ̶c̶o̶n̶s̶i̶s̶t̶s̶ ̶o̶f̶ ̶g̶e̶t̶t̶i̶n̶g̶ ̶a̶ ̶m̶e̶s̶s̶a̶g̶e̶ ̶f̶r̶o̶m̶ ̶S̶Q̶S̶ ̶c̶o̶n̶t̶a̶i̶n̶i̶n̶g̶ ̶e̶v̶e̶n̶t̶ ̶i̶d̶ ̶(̶B̶2̶ ̶i̶n̶ ̶f̶i̶g̶.̶ ̶3̶)̶,̶ ̶r̶e̶t̶r̶i̶e̶v̶i̶n̶g̶ ̶a̶n̶ ̶e̶v̶e̶n̶t̶ ̶d̶a̶t̶a̶ ̶f̶r̶o̶m̶ ̶D̶y̶n̶a̶m̶o̶D̶B̶ ̶u̶s̶i̶n̶g̶ ̶t̶h̶a̶t̶ ̶i̶d̶ ̶a̶n̶d̶ ̶i̶n̶v̶o̶k̶i̶n̶g̶ ̶a̶ ̶L̶a̶m̶b̶d̶a̶ ̶f̶u̶n̶c̶t̶i̶o̶n̶ ̶(̶B̶3̶ ̶i̶n̶ ̶f̶i̶g̶.̶ ̶3̶)̶.̶
H̶o̶w̶e̶v̶e̶r̶,̶ ̶i̶n̶ ̶t̶h̶e̶ ̶c̶a̶s̶e̶ ̶o̶f̶ ̶a̶ ̶v̶e̶r̶y̶ ̶l̶a̶r̶g̶e̶ ̶n̶u̶m̶b̶e̶r̶ ̶o̶f̶ ̶e̶v̶e̶n̶t̶s̶ ̶o̶r̶ ̶e̶v̶e̶n̶t̶ ̶h̶a̶n̶d̶l̶e̶r̶s̶,̶ ̶d̶u̶e̶ ̶t̶o̶ ̶t̶h̶e̶ ̶n̶a̶t̶u̶r̶e̶ ̶o̶f̶ ̶m̶e̶s̶s̶a̶g̶e̶ ̶q̶u̶e̶u̶e̶s̶,̶ ̶a̶ ̶s̶e̶r̶v̶i̶c̶e̶ ̶l̶i̶k̶e̶ ̶t̶h̶i̶s̶ ̶c̶a̶n̶ ̶e̶a̶s̶i̶l̶y̶ ̶b̶e̶ ̶s̶c̶a̶l̶e̶d̶ ̶h̶o̶r̶i̶z̶o̶n̶t̶a̶l̶l̶y̶ ̶b̶y̶ ̶a̶d̶d̶i̶n̶g̶ ̶n̶e̶w̶ ̶i̶n̶s̶t̶a̶n̶c̶e̶s̶.̶
Update Nov 19, 2019 AWS Lambda now supports FIFO queues as an event source.
The code base of this project is organized using a simple rule: outer layers can depend on lower layers, but no code in the lower layer can depend on any code in the outer layer.
A domain model is at the center. It contains application business rules and event definitions which are used by outer layers.
It’s implemented with pure functions which are easy to test and compose.
Command handlers in the application layer can use a domain model and communicate with the outside world only by using injected repository which implements the repository interface (so it’s easy to mock).
The outermost, client layer is separated into three sectors: infrastructure, ports, and tests.
The infrastructure consist of:
- Implementation of repository interfaces (DynamoDB or in-memory
- AWS Cognito authorizer and lambda triggers for authentication.
- Lambda handlers and node scripts required for event handlers (
rebuildDatabasescripts and similar).
- Other implementation details like 3rd party libraries, frameworks, configuration data, helper functions etc.
Ports act as an “entry point” to the application. They accept requests from external agencies (e.g. REST or CLI) or event handlers and communicate with the infrastructure and application layer.
Because of their “special nature” tests have their own sector in the client layer. But, you can think of them as another port.
However, I use a separated “tests” directory only when there are a lot of use-cases defined in numerous files. In simpler applications like this one, I usually define tests next to the file being tested (no matter in which layer).
UI for this project is composed of two React apps:
- Web version created by Create React App, using Ant Design components and hosted in AWS S3 bucket.
- Chrome extension created using Preact and bundled with Parcel. The main reason for using Preact was due to its small size which I hoped will result in better UX (faster pop-up display). However, I don’t think this choice was justified and would be happier if instead, I’ve chosen CRA.
You can check the entire source code of the project here.