Event Sourcing + CQRS: from theory to AWS — part 3

Christian Paesante
11 min readDec 13, 2019

--

In the previous article we have seen the general architecture resulting by applying Event Sourcing and CQRS in a microservice system.

In this article we’ll see how to bring Event Sourcing and CQRS on AWS, exploiting managed solutions to reduce maintenance and provisioning.

For the majority of it, the architecture will remain unchanged, but there are some additional mechanisms to adopt due to AWS services limits or simply recover from their occasional failures.

Quick overview of the AWS services used

Event Store

Event Streams and Events

The Event Store will be implemented on DynamoDb. DynamoDb is a NoSQL key-value database. Data is organized in tables. Writes are strongly consistent, reads can be eventually consistent or strongly consistent, as specified at query time.

On each table creation you have to define the “key-schema”: you have to explicitly define which fields will contain your key and how it is composed. The key uniquely identify your item in the table. You can choose between a Primary Key and a Composed Primary Key. The last one will contain a partition key and a sort key, the first one only a partition key.

DynamoDB uses the partition key’s value as input to an internal hash function. The output from the hash function determines the partition (physical storage internal to DynamoDB) in which the item will be stored. All items with the same partition key value are stored together, in sorted order by sort key value. — AWS Documentation

This means that we can efficiently query data within a partition if we know the partition key of the partition of interest.

After brief description, let’s bring all of the Event Store features on it!

Our Event Streams will be mapped as DynamoDb partitions. Each Event Stream Id will correspond to the partition key of the table. The items we’ll save in the table will be our events. So each event will be uniquely identified with the composed primary key “StreamId-EventId”:

  • StreamId: identifies the event’s stream. It’s an uuid string. It’s our partition key.
  • EventId: identifies the event within the whole stream. It will be an increasing integer. It’s our sort key.

The Event Id used as an increasing integer will help us to keep events ordered within the same stream and query them in order.

Whenever we receive a new request:

  1. we ask for all items under the partition of interest (hence we query for the events in the event stream)
  2. we recompute our aggregate
  3. run our business validations on it
  4. then write the corresponding new event.

We have to remember that we are treating our Event Store as our Single Source of Truth.

Event Streams Optimistic Locking

Between the read and the write, any other instance of our application can write to the same stream we are acting, hence changing the state of the aggregate. Since the actual aggregate has changed state, we must re-run our business validations before making the write. We need some Optimistic Locking on our stream to avoid writing a to the updated version of the stream, hence detecting any changed happened between the aggregate computation and the write.

How can we detect a change when we need to write? DynamoDb offer a feature called Conditional Writes: in the write request we specify a condition necessary to commit the write. If the condition is not satisfied the write request returns an error and we detected our change.

Since we know which is our last EventId queried, our condition is that it must not exist an item with StreamId equal to our stream id of insterest and with EventId greater that the last queried event id.

!(StreamId == sid && EventId > lastEventId)

In other words we must not write when the stream has a version following the expected version.

If the condition is not satisfied, we recompute again our aggregate, we re-run our business validation and we retry the write. We keep doing it until the write is not successful.

Events publication

DynamoDb offers the possibility to have a log of the changes happened in a table: DynamoDb Streams.

DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours. Applications can access this log and view the data items as they appeared before and after they were modified, in near-real time.

Dynamo Db Streams also integrates with Lambda Triggers:

Amazon DynamoDB is integrated with AWS Lambda so that you can create triggers — pieces of code that automatically respond to events in DynamoDB Streams. With triggers, you can build applications that react to data modifications in DynamoDB tables. — Docs

With a Lambda function we can pick changes from DynamoDb Streams as they come and send them to an SNS topic in order to publish them to an arbitrary number of subscribers.

To recap, our Event Store will be composed of:

The new event item added to the table (in red) is published through DynamoDb Stream to a Lambda function which publishes it to a SNS Topic, which deliver it to multiple consumers SQS Queues

Events Replay

Let’s introduce a new component: the Replayer. It is responsible of replaying the events, quering them from DynamoDb and effectively pushing them into the destination SQS queue.

Quering such a lot of data, leads to big response sizes. DynamoDb paginates responses when they are bigger than 1MB. Hence, our Replayer must keep track of the already replayed events in a table somewhere.

But how does it query events? There are two ways. The first one is to Scan the entire table for all events a push them to the queue of the interested Projection or service.

This operation unfortunately is not parallelizable. So the replay will be really slow. Moreover, the operation will impact on the operational read capacity units of DynamoDb, affecting the operational performances.

Another way exploits Global Secondary Indexes. GSIs have their own compute units and quering them won’t affect the operational service performances. But how can we parallelize the process?

GSIs are useful when you have to query your items with a partition and sort key different from the table’s ones. GSIs project your items into an index that you can query like a secondary table. This projection comes at cost of the write capacity units of the GSI. If your GSI has not enough WCU to keep the pace of the writes of the table it will start to lag behind.

Keep in mind that any partition in the GSI has the same WCUs and RCUs limits of the partition in tables! Make sure you are projecting to an enough number of partitions.

DynamoDB supports your access patterns using the throughput that you provisioned as long as the traffic against a given partition does not exceed 3,000 RCUs or 1,000 WCUs. — AWS Documentation

What will we store in our GSI? Our GSI will project all our Event Streams in other streams! Have a look to the following picture:

Events coming from 4 Event Streams are mapped to 2 easy-to-discover ReplayStreams. This is done at write time. The service knows how many Replay Streams it’s using and it assigns the ReplayStreamId accordingly.

As you can see our Event Streams will be mapped to a Replay Stream. A Replay Stream (RS) is a collection of events coming from different Event Streams. Each RS is identified by a ReplayStreamId which is an increasing integer. In this way, knowing all the existing RS is trivial (they are just all with ReplayStreamId between 1 and MaxReplayStreamId).

Events from the same Event Stream will be projected in order to within RS. A single RS will contain events coming from multiple Event Streams.

Since each RS is a different partition, we can split reads across multiple RSs concurrently. In this way, we can easily launch multiple copies of our Replayer, one for each RS. More RSs means more parallelized work and more capacity of the GSI to project under high loads!

The Replayer can be implemented with a Lambda function that makes the RSs discovery and then starts n concurrent functions (1 for each RS) that recursively push paginated events to the destination queue until they reach the end of their RS.

One limitation for this is that if one of the recursive functions fails
unexpectedly, the events of its respective RS won’t be replayed, because the recursion will be stopped.

Needless to say, this can be solved by implementing some recovery mechanism that checks if there are RSs that has been left behind while i’m replaying or a recovery mechanism implemented in the consumer that detects missing events coming from a certain Event Stream of our provider.

As said before each RS has an Id which is an increasing integer. That because whenever we see that our GSI is reaching the throughput limit for a partition (i.e. a RS), we should consider to expand the number of RSs.

The mapping of each item to a RS is done at write time. This means that whenever we are reaching that throughput limit, we can just change the number of RSs (stored somewhere) and the entire service will adapt to it.

RCU and WCU Provisioning

Provisioning resources in advance is generally wasteful. If your allocated resources are not well exploited, you’ll end up with an unnecessary high bill. On the other side if your resources are too few, you’ll end up with you application being unavailable.

With DynamoDB you have 2 ways of dealing with this.

  1. On-Demand mode: you pay what you are consuming. DynamoDB adapts to loads accordingly. This is great for unknown/unpredictable traffic.
  2. Provisioned mode with Auto-Scaling: you pay for a pre-allocated Capacity Units. If the table’s consumed capacity exceeds your target utilization for a specific length of time, Auto-Scaling takes care of increase/decrease the capacity units on your behalf. This is great for a load that increases as you business grows while maintaining cost predictability.

Exploiting one of the two can help you to automate load monitoring for DynamoDB resulting in a dynamic resource allocation, for the most efficient experience.

Limits of DynamoDB

  • Partitions (identified by a partition key) can support up to 1000 writes per seconds and 3000 writes per seconds.
  • Maximum item size in DynamoDB is 400KB.
  • Nested attribute depth up to 32 levels.
  • Maximum of 20 Global Secondary Indexes per table. Can be increased by contacting the support.

For an exaustive list see the Official Documentation.

Queue

We described our queue as a persistent queue which must guarantee at-least-once delivery. In-order delivery is not mandatory, but is recommended.

Turns out SQS queues are what we are looking for! There are two types of queues:

  • Standard: guarantees at-least-once delivery, can handle “unlimited” requests per seconds and guarantees “slightly out-of-order delivery”.
  • FIFO: guarantees in-order exactly-once delivery with self-managed deduplication mechanism at expense of a “limited” max number of requests per second (300 requests or 3000 messages).

Both persists events up to 14 days. Both guarantees that each message instance (or event) is processed only once: whenever an message is read it is “hidden” by the queue for a period. If within the timeout period it’s not explicitly deleted, the message will reappear in the queue.

This ensure that messages are not lost in case of failures while processing them and ensure that each message instance is not processed concurrently.

As you can see, choosing FIFO queues requires to orchestrate multiple queues in case of high traffic, adding additional complexity to our architecture.

Since ordering is a problem handled by the Projector, we’ll choose the Standard queue.

Trigger

SQS offer the ability to set some triggers on queues. This will give us the ability to run our projector only when events are submitted to the queue. This is great if we plan to run our Projector as a serverless Lambda function.

Limits of SQS

  • Maximum message size is 256KB (lower than DynamoDb’s item/event size)
  • SQS can keep messages up to 14 days. I think it’s a quite reasonable time and in case events are deleted, you can still replay them. I would not see it as an limitation for our use case, but it’s still an actual limit.

More info in the Official Documentation.

Projector

Our Projector will be deployed as a serverless Lambda function. Resources are completely managed by Lambda and we don’t need to be concerned about configuring stuff and ensure it is running. Other than that, any update to the Projector is istantly deployed.

When linked to a SQS trigger, Lambda manages also the polling interval and the number of concurrent functions running in response to the rate of events coming to the queue, which takes away a lot of other complexity letting us to concentrate on the projecton logic of the code.

Order Control

We said that is a task of our Projector to process events in order. This can be done keeping in a table for each streamId the last processed eventId.

Whenever we receive an event, we check if the event’s id is equal to the last processed id + 1. If it is we process it and then we update the last processed eventId.

if (eventId > lastProcessedId + 1)
// Leaves the event in the queue. When the visibility timeout
// expires the event will reappear and will be processed again.
orderTable.increaseFutureEventsCount(e.streamId)
ignoreEvent(e)
else if (e.eventId < lastProcessedId + 1)
// It is a past event. We already processed it.
deleteEventFromQueue(e)
else if (e.eventId == lastProcessedId + 1)
// It is the event we are expecting!
handleEvent(e)
orderTable.increaseLastProcessedId(e.streamId)
deleteEvent(e)

As you can see these last two operations are not atomic. This means that our events can be processed more than once in case of failures of our Projector between the two. This requires that events have to be designed to be idempotent.

In the general case (when there are no failures), events will be deduplicated as a side effect of our order control mechanism.

My recommendation is to use a DynamoDb table with as partition key, the stream id. Since DynamoDb uses hashing to find partitions it’s very efficient in lookups for the last processed id.

LastProcessEventId keeps track of the last processed event. FutureEventsCount keeps track of how many times we encountered an event with EventId greater than LastProcessedEventId+1, which means how many times we met an event that have to be processed in the future and not now.

An additional field can be used to count the number of times we met an event whose id is higher than the expected one. When this reach some threshold, probably the expected event was lost. The Projector can then ask directly to our Event Store for the missing event. Every time an expected event is met and processed this additional field is reset back to 0.

Inter-service communication

Inter-service communication will be supported by an SQS queue. It’s up to the application consuming events to implement an Order Control mechanism that can be identical to the one used by the Projector.

In the Order Control table we should also add an additional field used to determine if the expected event to process has been lost. In that case it’s important to ask to the other service to send the missing events to queue.

Note that the producer should not resend events to the SNS topic in order to avoid sending them also to all other services which didn’t request them. This reduce the chattiness between only two services.

Replaying Events

In case we are starting a new microservice which needs to consume old events from another, we need to send all old events to the new service’s queue.

It’s importat to avoid sending old events to the provider SNS topic otherwise we’ll increse the chattiness of a big part of the system, reducing its performances. This means that the provider Replayer must be told of where to publish old events.

Conclusions

In this last article we saw how to bring our architecture on AWS. Remember that this can be done with almost any cloud provider, since it was designed to be “cloud-agnostic”.

As you saw with AWS we exploited complete managed services, helping us to reduce the time spent provisioning resources and ensuring they are available. Moreover most of the AWS services can self-adapt to spike loads and periodical load increase.

--

--