The Quest for Serverless CQRS Part 2

Almost 2 years to the day, I finally have a second post in my well intentioned series on Serverless CQRS. If you’re unfamiliar with CQRS or Event Sourced systems, please go read the first article before continuing.

In this post we’ll look at how I implemented an Event Store with DynamoDB that has:

  • Strong write consistency
  • Eventual read consistency
  • Strong in-aggregate event ordering
  • Loose global ordering (i.e. system clock dependent)

In follow up posts, we’ll explore alternatives that can achieve strong global ordering but come with a significant increase in complexity.

But for today let’s looks at what we need out of an event store, and how each function has been achieved. As we look through the code, we’ll note where the current solution has known limitations and how you could overcome them.

Click here to see the code in question

An Event Store MVP

When building an event store, there are three key functions it needs:

  1. Store events
  2. Get all events
  3. Get events for an aggregate

First, let’s explore how I configured the DynamoDB table:

The Key Schema

The choice of aggregateId and aggregateVersion as the partition key and sort key enables us to store events correctly. We need to ensure that if there are concurrent writers to the same aggregate, the second write will fail, allowing the client to either retry the command or bubble the error up to the user. To see the mechanics of how this is achieved, see to the Store Events section below.

The Global Secondary Index (GSI)

Due to the key schema choice, we have no efficient way of getting all events. Instead, we use a GSI to enable us to reorder our items so we can get events in order.

The secret sauce for this is the use of KSUIDs as the eventId. This gives us a way to order the events lexicographically based on a 20-bit timestamp. Unfortunately, due to the reliance on system clocks we only have a loose ordering guarantee here. If clock skew is observed on the write side, we will end up with events inserted out of order.

Additionally, GSIs are eventually consistent as they use separate partitions to store the data. However this lag between the core table and the GSI has no material impact to our system.

The TYPE attribute is currently just the literal EVENT as we need a partition key within which to sort.

The Hot Partition Problem

By using only a singular partition key for our GSI, we have contravened a DynamoDB design principle. Our GSI is constrained to a single partition, (3,000 RCUs and 1,000 WCUs), this means that under load the eventual consistency between our table and our index will increase.

To overcome this, you could extend the code to accept multiple known literals for TYPE, potentially EVENT_2, EVENT_3, EVENT_N. Assuming you segment writes equally between the literals you would increase the throughput linearly with each new literal added. In exchange, you would now have to gather events from across all partitions when reading from the event store.

Get Events For An Aggregate

Getting events for an aggregate is trivial. We simply need to query the table for all items with a given partition key.

Get All Events

Get all events is currently implemented as a scan on the GSI. For production use, this should be extended to get events from a particular time offset, i.e. from a certain KSUID.

Additionally it is currently limited in that it can only retrieve 1MB of data from DynamoDB at a time. So it would need extending to use the LastEvaluatedKey from the scan response to retrieve all events from a table with a sufficient number of events.

Store Events

To store events in the table we need to protect from concurrent writers. In order to achieve this, we need to be able to lock the aggregate version as we write.

This has been implemented as an optimistic lock using a DynamoDB item per aggregate to store the current max aggregate version. We then use a condition to ensure that the maximum aggregate version has not changed from when we initially handled the command and produced the events to be stored.

The following code takes the events produced via handling the command and computes the maximum aggregate version for each aggregate, as well as the number of events produced per aggregate. These combined enables the code to assert the required condition.

From there we can now construct the first half of our transaction:

Then we can add the actual writing of the events to the store to our transaction and execute it.

Through this mechanism we are able to ensure our guarantees about how concurrent writes are handled. We have explicit tests to cover the different scenarios here.

KSUIDs, Simple But Incomplete

Through the usage of KSUIDs, this approach to DynamoDB event store design is the simplest which will still provide at least loose global ordering. In order to implement strong global ordering we will need a singular channel through which to pass events, ensuring that there is only ever one writer to the store.

An approach we will cover in the next instalment, which is hopefully not another 2 years away.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store