Implementing event sourcing with Azure Functions

Roy Hoeymans
tech.effectory
Published in
8 min readAug 9, 2019
Image of an antique projector

Event sourcing can be a very effective design pattern in large-scale distributed systems. With the rise of new cloud-based serverless technology, it is now easier to implement than ever before. In this blog post I will describe an implementation of the event sourcing pattern using Azure storage and Azure Functions.

I’ll also discuss some of the challenges of this approach, such as dealing with concurrency and creating projections from multiple event streams.

The event sourcing pattern

Before diving into the implementation, let’s talk about some terminology. If you’re not familiar with event sourcing, it is a pattern where rather than overwriting state in a database, state changes are stored as events in an event store. These events are then applied by a projector to create a projection — a view of the current state. Projections are optimized to group the data in the exact way your application needs it.

Event sourcing is usually combined with CQRS (Command Query Responsibility Segregation), a pattern that states that every method on an object should be either a command or a query. Queries return data but do not alter state, commands change state but do not return any data. This separation makes it easier to understand how state changes in your system.

It also takes away a lot of complexity in the read and write operations of an API. Complicated database queries with multiple joins and data manipulation in a get call? Not anymore! Reading data that your app needs is as simple as retrieving an optimized projection. Writing is also more straightforward: just add a new event to your event store. The real magic of event sourcing is in the projectors that transform the events into a view.

While there are advantages to this pattern, there are also some considerations to be made before deciding to adopt it, mainly concerning eventual consistency. I won’t go into the pros and cons of event sourcing in this post since a lot has already been written on this subject. A great place to start is this article in the Microsoft Azure docs, and Google and YouTube can help you find more.

For now, let’s talk about implementation. To realize this pattern we need to create a few components:

  • An event store
  • A place to store projections
  • Projectors
  • Message queues to trigger projectors

Azure provides some excellent tools to create them.

Storing events and projections with Azure storage

An event store is simply a data store for all the events. This can be anything from a relational database to NoSQL, but it is recommended to use something that is optimized for storing and querying large amounts of non-relational data. Azure table storage fits this requirement perfectly.

If you’ve only worked with relational SQL databases before, be prepared to change how you think about storing data in tables.

Instead of needing to define the columns of a table beforehand, a storage table derives its structure from the inserted rows. Rows are organized by a partition key and indexed by a row key. Queries on partition key and row key are fast, queries without those are slow, so you need to pick your partition key carefully to optimize querying performance.

An obvious choice for the partition key would be the name of the event stream. A stream is a sequence of events in the event store that belong to a certain aggregate. For example, the event stream of a Project aggregate might contain Project.Created and Project.SettingsChanged events, and a User aggregate might contain User.Created and User.NameChanged events. Therefore, the partition key is the aggregate type concatenated with its id (e.g. project-1234, user-2).

A row key has to be unique within a partition, so this can be an event id or sequence number. Additional information you need to store is the event type and the event data. In our implementation we store the event data as a JSON object in a Payload column.

Here is what some rows in an event store could look like:

To be able to use table storage, you need to create an Azure storage account. This account also gives you access to blob storage and queues. We are using queues to trigger the projectors whenever a new event is added to the event store.

Blob storage can be used to store all kinds of unstructured data (documents, images, videos, raw data). This is where we will store the projections as JSON files.

Let’s say we have a user projector that reads events from a user event stream. The resulting projection will be a JSON file called “user-1234.json”, with the content looking like this:

{
"user": {
"id": 1,
"name": "David"
}
"latestProcessedEventId": 2
}

Now that we have our event store, a store for our projections, and message queues, let’s look into how Azure Functions are going to help us to create the projectors.

Creating projectors with Azure Functions

Azure Functions is Microsoft’s latest flagship in serverless technology. It allows you to upload a piece of code in the language of your choice to the cloud and define the triggers that will make it run. This piece of code is what we would call a function. Functions are meant to execute short-lived and stateless tasks, such as processing an order or resizing an uploaded image.

What makes Azure Functions so convenient is that they manage the infrastructure for you, so you only have to focus on your code. Functions will also scale automatically depending on the amount of requests, and if you choose a consumption plan you only pay for each time a function runs.

A function can be triggered in several ways, including via an HTTP request, service bus, or a time trigger. It can also be triggered by an Azure storage queue, which is what we will use to trigger our projector functions.

For each type of projection that you want to create, you need to create an Azure Function that will act as a projector, and a storage queue that can trigger it.

Applying events

The job of a projector is to read events from a stream and apply them to update a projection to the latest state. The projector should be triggered by a queue message whenever a new event is added to an event stream.

Let’s say we have an Azure Function UserProjector that creates projections of users. A message on a storage queue trigger-user-projector might look something like this:

{ 
"eventStreamId": "user-1",
"eventId": 5,
"rebuildProjection": false
}

This means that the projector needs to query events from the store with an event id > 5, and apply these events to update the user projection. When the rebuildProjection property is set to true, the event id is ignored and the projector will query all events on the stream to create the projection from scratch.

Applying events needs to be done sequentially, so this is typically done by iterating over the new events, where the action to change the state of a projection is determined by the type of the event. After all the events are applied, we also save the latest event id that has been processed in the projection. This can be used to check if it is up to date with your event store and will come in handy when dealing with concurrency later.

Below you can find some pseudo-code to illustrate what applying events in code might look like:

public class UserProjector {  [JsonProperty]
public long LatestProcessedEventId { get; set; }
[JsonProperty]
public User User { get; set; }
public async Task ApplyEvents(List<Event> events) { foreach(var @event in events){ switch (@event.EventType)
{
case 'User.Created':
var user = JsonConvert
.DeserializeObject<User>(@event.Payload);
User = user;
break;
case 'User.NameChanged':
var user = JsonConvert
.DeserializeObject<User>(@event.Payload);
User.Name = user.Name;
break;
}

LatestProcessedEventId = @event.RowKey;
}
}
}

Dealing with multiple instances

As I mentioned earlier, Azure automatically spins up new instances of your functions as the amount of requests increases. This is great for automatic scaling, but it also means that multiple projectors for the same event stream can be triggered in parallel. To prevent conflicts and concurrency problems, we can use the lease operation that blob storage provides.

A lease operation establishes a lock on a file for write and delete operations. When a projector is triggered, we make it try to acquire a lease on the projection file it wants to update. If it can acquire the lease, it will continue as usual. If the projection file is already leased by another process, the instance will stop executing.

After the projector instance that has acquired the lease is done updating the projection, it checks if new events with an event id higher than the one stored in the projection file exist on the stream. If this is the case, it adds a new trigger message to the queue to process those new events.

Creating projections from multiple streams

In some cases you might want to create a projection that needs to apply events from multiple streams. A core concept of event sourcing is that the event store is your immutable source of truth. It is not recommended to let your projector read other projections, since they might be modified or not up to date.

One way to create projectors that read from multiple streams is to manage event stream pointers in the projection. A pointer keeps track of the latest processed event id for a stream.

Let’s say we want to create a projection containing a list of users in a project. The project stream is the base stream in this case, and you want to subscribe to user event streams once users are added to a project. This basically means adding a pointer in your projection when applying a Project.UserAdded event, like so:

“subscribedStreamEventPointers”: { 
“user-1”: 0
}

Whenever you process an event in that user stream, update the latest processed event id in the stream event pointer. You could also unsubscribe from streams when you don’t need information from that stream anymore, for example when applying a Project.UserRemoved event.

If we have an event store that looks like this:

The projection ends up looking like this:

{
“projectId”: 1234,
“baseStreamLatestProcessedEventId”: 3,
“subscribedStreamEventPointers”: {
“user-1”: 2,
“user-2”: 1
},
“projectName”: “My project”,
“users”: [
{ “id”: 1, “name”: “John” },
{ “id”: 2, “name”: “David” }
]
}

Wrap-up

So there you have it, an example of an event sourcing implementation with Azure storage and Azure Functions. However, this is of course not the only way! There are many other tools at your disposal such as Service Bus, Cosmos Db and Event Grid, each with their own strengths and limitations. Experimenting and proof of concepts are a great way to see what works for your situation.

One thing I really liked about Azure storage is the storage emulator. This allows you to run a local environment that emulates blob, queue, and table storage without an Azure subscription. You can set the input of your Azure Functions to the emulator and develop locally, so these two services complement each other perfectly.

Hopefully this blog post has given you some inspiration for your own event sourcing endeavor. Feel free to share any feedback or your own experiences in the comments!

--

--