Scalable content feed using Event Sourcing and CQRS patterns

Łukasz Lalik
16 min readAug 2, 2018

--

Challenges of event-based microservices

As far as they provide many advantages, when it comes to scaling of the project and engineering department, microservices architecture introduces a whole new category of challenges. Lots of them revolve around communication between microservices and event-based communication is an especially interesting topic.

Feed feature

Figure 1. Brainly main page

To better describe these problems and their solutions I’ll use Brainly’s questions feed feature as the main theme. It’s a central part of Brainly main page presented in Fig. 1. From the user perspective, it contains a list of latest questions. Users can filter them by subject, school level and several other parameters.

From the backend perspective, feed microservice needs to access a lot of data available in several other microservices and combine them together. We achieved such integration by using events. Let’s see how it looks in practice.

Assume that we always want to have a list of latest questions in Feed microservice, sorted by time when they were asked. As shown in Fig. 2, whenever user asks a question, client makes an HTTP request to Questions & Answers Microservice. In this request it sends question content. Such question is saved into a microservice local database but also QuestionCreated event is published to Queue Server. This event contains details about a newly created question like its ID, content and creation date.

Figure 2. Microservices at Brainly

Next, every microservice, which is subscribed to QuestionCreated event, will receive it. Microservices can use it to perform some calculations, update their local state, or publish further events. In our example, Feed microservice can use it to build its own database of the latest questions, sorted by creation date. Popularity Tracking Microservice also consumes it and as a result it may publish QuestionBecamePopular event .

When a user wants to fetch the latest questions, he or she makes an HTTP request to Feed microservice. Then it will use data from its local database to provide a response to that request.

That was a rough description of how Feed microservice fits into the context of our infrastructure. In the next section, I’ll start with the perfect world model of asynchronous communication. In sections after that, I’ll confront this model with real-world problems so at the end we’ll end up with a more robust application.

Perfect world model

Figure 3. Adding a question; perfect world model

Let’s start with the perfect world model and see it in action. When a user makes a request to add question this request goes to Question & Answer service. This service publishes QuestionCreated event which is forwarded through QueueServer to Feed service. Then Feed service inserts a new row into its private database. Now, it has another question in its database that can be provided to users.

Figure 4. Deleting question; naive model

Immediately after adding a question, user realizes that there was a mistake in question’s content and decides to delete it. Because of that, a user performs delete request to Question & Answer service. Now QuestionDeleted event is forwarded and both Question & Answer and Feed services delete row corresponding to the deleted question. As a result, both services are eventually consistent.

Consumer error handling

In the perfect world, above model will be enough. However, in reality, many things can and will go wrong. At the beginning let’s see what will happen when Feed microservice encounters an error while handling QuestionDeleted event.

Figure 5. Event is lost when consumer crashes

Let’s assume that when Feed service received QuestionDeleted event, a database was down for maintenance. As a result, service won’t be able to remove a question from its database. If service will crash, then QuestionDeleted event will be lost forever, and our system will remain inconsistent. Users won’t be happy, because they will see not existing question on feed.

Retrying on error

Figure 6. Queue server retries event if ACK doesn’t come on time

A solution for that problem is to add acknowledgement mechanism (most of the queue servers provide such feature). If service handles event successfully it sends back acknowledgement and event is removed from the queue server. Otherwise, the event returns to the queue and is sent again to the consumer to try to process it again. To prevent from putting a high load on the consumer by constantly sending them back the same event, exponential backoff might be applied. It means that the message will be delayed by an interval that is increased after every retry. E.g. after first failure queue server may wait 1 second before retrying. After second, it may wait 2 seconds, after the third 4 seconds and so on.

Dead letter queue

What if the cause of such error is not temporary? E.g. there was a bug in Question&Answer service and it produced a broken QuestionDeleted event? Or database maintenance takes much longer than expected? Queue server would be retrying such event for hours, days or even longer. We shouldn’t lose events in that case as well but we also don’t want to waste our resources on retrying the same event over and over again.

In such cases, our service should give up and send this event to separate queue, usually called a dead letter queue. Such queue should have no subscribers. Its only purpose is to keep events, that cause troubles, for further examination. After examination, such events can be discarded or sent back to service. E.g. you can wait until database will be up again or service logic will be adjusted to handle broken events. Such dead letter queue should be constantly monitored and connected with some alerting system. If any events end up there then the engineering team should be notified and take an action.

Event delivery semantics

Another complication resulting from event-based integration comes from the fact that exactly once delivery is hard to implement (e.g. RabbitMQ doesn’t provide it at the time of writing). Therefore you have two options to choose from: at most once delivery or at least once delivery.

At most once delivery

In at most once delivery your event will be delivered once or not delivered at all. You achieve that if your service publishes an event but doesn’t care about the result. If an event wasn’t published successfully then it is lost. We haven’t found any use cases for it but if you don’t mind losing events and you don’t want to invest time to build a solution for handling at least once delivery then this model might be a good choice.

At least once delivery

Figure 7. At least once delivery

At least once delivery ensures that event will be delivered by requiring confirmation from queue server that event was published successfully. If such confirmation doesn’t arrive or contains an error then an event is published again. Unfortunately as well as event, confirmation also can get lost or don’t arrive on time. That will cause the same event to be published twice or more times. Therefore correct handling of duplicated events is essential when using event-based communication with at least once delivery semantics.

Duplicated events

Idempotent events

The first technique that could be applied to properly handle duplicated events is to make them idempotent. An event is idempotent when no matter how many duplicates of it will arrive to service, the resulting state of the service will be the same as only one copy of the event arrived. Let’s see an example, looking at the non-idempotent event first:

On Brainly, users can thank other users for giving a good answer. Let’s assume that particular answer received 15 thanks so far. When another user thanks for an answer ThanksForAnswerGiven event is published:

{
"thankingUserId": 1234,
"answerId": 444
}

If Feed microservice wants to keep track of the number of thanks for answers, it subscribes to ThanksForAnswerGiven event and increments thanks counter by one whenever it receives this event. However, if the same event will arrive twice then counter will be incremented twice as well. As a result Feed microservice will end up with an inconsistent state — 17 thanks instead of 16.

To solve this problem you can change the structure of ThanksForAnswerGiven and make it idempotent.

{
"thankingUserId": 1234,
"answerId": 444
"totalThanks": 16
}

In such form, the event carries information about how new state looks like instead of information how to change state. Thanks to that, no matter how many times event will arrive, the resulting state will be the same as only one copy arrived.

Unfortunately, idempotent events make your system vulnerable for out-of-order events. E.g. if an answer was thanked twice, two ThanksForAnswerGiven events will be published. First, with totalThanks equal to 16 and second with totalThanks equal to 17. However, if these events will arrive in the wrong order then consumer state will be set to 17 at first and to 16 after that. As a result, the system will remain inconsistent. In Events Ordering section I’ll describe how to handle such cases.

Event unique identifier

Idempotent messages are not always feasible. Making QuestionCreated event idempotent would mean to squeeze inside an event all questions asked so far together with a newly asked question.

Because of that, we use another mechanism for handling duplicated events. Whenever an event is created it is given a unique identifier. As a result, if event is retransmitted then its duplicate will have the same identifier. This allows a consumer to discard events with identifiers that it had already consumed.

For event identifiers we use Universally Unique Identifiers (UUID). It allows generating identifiers without any centralised party assigning them. At the same time chance of collision for UUID v4 is negligible in most cases.

The drawback is that to distinguish duplicates from genuine events consumers need to keep the history of IDs of all events that they have already seen. Theoretically, a consumer should keep a full history of events, but in practice, the last couple of hours should be sufficient in most cases.

Events Ordering

Lack of exactly once delivery isn’t the only challenge that you must face when implementing event-based communication. Another one is ordering of the events — you can’t expect that events will arrive to your service in the same order as they were published.

Figure 8. Events arrive in different order than they were published

Why it may cause problems? Let’s get back to our example of a user asking a question and deleting it shortly after that. It will result in a QuestionCreated event being published and, just after it — QuestionDeleted. However, these events don’t have to arrive to Feed microservice in the same order. As a result, service may consume QuestionDeleted event first and then QuestionCreated. After receiving QuestionDeleted service will try to delete this question from its database. The question won’t exist here yet so nothing will happen. Next, QuestionCreated event will arrive so it will be added to the service’s database. As a result, such a question will stay in the database forever.

To allow your consumer microservice to tell which event was first, you’ll need to include some additional information in events that are published. At Brainly we tried two approaches. First was the global event counter but finally, Unix timestamp proved to be sufficient for our needs.

Global event counter

In this solution, we used Redis for maintaining a counter. Before publishing event, microservice sends INCR command to Redis. This command atomically increments counter and returns its new value. Next, this value is included inside published event under orderNumber field. Thanks to that if event B was published after event A then event B must have orderNumber greater than event A.

Figure 9. Using Redis for ordering events

As long as it can work well for events inside single service, it would be problematic if you would like to maintain order between events published by different services. In such case, every microservice would have to send INCR commands to the same, single Redis instance. This Redis will become a single point of failure and bottleneck of the whole system.

Unix timestamp

Figure 10. Perfectly synchronised host’s local time

Limitations of the previous approach led us to another solution. We decided to include current Unix timestamp in an event whenever it is published. Thanks to that, if two events were published one after another, the second event should have bigger timestamp than the first one. Fig. 10 shows how it would work in ideal circumstances.

This solution is simple to implement but it is far from perfect. First, local time of different hosts may diverge. Fig. 11 shows situation when Popularity Tracking Microservice’s local time is ahead of Q&A Microservice. As a result QuestionPopular event will have timestamp greater than QuestionCreated even though it happened earlier.

Figure 11. One host’s local time is ahead of another

The second problem is limited precision of timestamp. E.g. if seconds precision is used then if two events were published during the same second their timestamps will be the same. So it’s important to analyse the frequency of published events and adjust timestamp precision accordingly. It could allow to limit number of timestamp collisions to negligible amount.

There are other, more sophisticated solutions that could be used to determine events order like Lamport Timestamps or Vector Clock. However, Unix timestamp happened to be good enough for our current needs. Its simplicity and low implementation cost is worth the price of small number of timestamp collisions and out-of-order events (resulted from imperfect local time synchronisation).

Event Sourcing

Unique identifier and timestamp included in events provides enough information to properly handle situations when they are duplicated or arrive in invalid order. In the rest of this section, I’ll describe how we applied event sourcing to take advantage of this informations.

Let’s get back to our Feed example. As we’ve seen before, it consumes QuestionCreated and QuestionDeleted events. This is the structure of this events together with timestamp and UUID:

{
"name": "QuestionCreated",
"uuid": "4fe61897-84e7-41c1-88a7-26f5b28e3d6d",
"timestamp": 1531686216,
"payload": {
"questionId": 1234,
"content": "Question content"
}
}
{
"name": "QuestionDeleted",
"uuid": "461c2188-31a2-445f-97eb-45e59ee5c1cc",
"timestamp": 1531686365,
"payload": {
"questionId": 1234
}
}

After receiving any of this events Feed microservice inserts them into the event store. Event store enforces the constraint that every event must have a unique uuid field. Thanks to that Feed microservice will discard any duplicates.

Next, if microservice needs to perform any logic on Question entity it performs the following steps:

  1. Fetch all events related to single question (e.g. with ID 1234) from the event store
  2. Sort them by timestamp
  3. Apply all events on Question entity to recreate its state

Here is example Go code that recreates Question entity from events (assuming that it receives events that are already sorted by timestamp:

// Package with two example events
package
event
import "time"type Event interface{}

type QuestionCreated struct {
ID int
Content string
Timestamp time.Time
}

type QuestionDeleted struct {
ID int
Timestamp time.Time
}
// Package with event-sourced entity
package entity

import (
"fmt"
"time"
"github.com/k3nn7/example/event"
)

type Question struct {
ID int
Content string
CreatedAt time.Time
IsDeleted bool
}

func NewQuestion(events []event.Event) (*Question, error) {
q := new(Question)
for _, e := range events {
err := q.applyEvent(e)
if err != nil {
return q, err
}
}

return q, nil
}

func (q *Question) applyEvent(e event.Event) error {
switch v := e.(type) {
case event.QuestionCreated:
q.ID = v.ID
q.CreatedAt = v.Timestamp
q.Content = v.Content
case event.QuestionDeleted:
q.IsDeleted = true
default:
return fmt.Errorf("invalid event %T", e)
}

return nil
}

Now microservice can execute logic on Question entity. If Question needs to be modified then it should be done by generating and applying events on it. E.g.:

func (q *Question) Delete() {
e := event.QuestionDeleted{ID: q.ID, DeletedAt: time.Now()}
q.applyEvent(e)
}

When persisting Question all newly applied events should be added to the event store. Later Question could be recreated to the same state by reading and applying all the events.

All above requirements, regarding the event store, caused that we chose PostgreSQL database for that purpose. It allowed us to implement logic for inserting and fetching events in a simple and optimal way.

To store events, we used single table events with the following schema:

  • question_id — ID of question that this event is related to
  • timestamp — to order events
  • uuid — to discard duplicates
  • payload —a payload of an event in JSON format
  • event_name — to know to which structure payload should be decoded

Such table structure allowed us to easily get all events for a single question in right order. It could be done with the following SQL :

SELECT event_type, payload FROM events 
WHERE question_id = $1
ORDER BY order_number

Inserting a new event was even simpler because all required data is available in every event.

Command Query Responsibility Segregation (CQRS)

Event sourced entities allowed us to properly order events and reconstruct Question entity from them. However, such form of storing entities (as a stream of events) is cumbersome when you need to query data.

For example in Brainly’s questions feed user requests 20 latest questions every time when enters the main page. Assuming that feed microservice has access only to event store it would have to:

  1. Read all the events from the event store and reconstruct Question entities from them
  2. Sort Question entities in memory by creation date
  3. Return 10 latest Question entities

In our case, where millions of users are making requests for feed that contains hundreds of millions of questions, something like that would be almost impossible. Especially when users expect to receive a response in several milliseconds.

That’s why we decided to take advantage of CQRS. Let’s see how we implemented it in feed microservice.

Assume that QuestionDeleted event was just published for the question with ID 1234. As described before, every time when a new event arrives it is appended to the event store. After that, all events for this particular question, are fetched from the event store (together with the newly appended event). Next, these events are applied on Question entity to bring it to most recent state. Now, this entity is used to create the new object — ReadQuestion which has the following structure:

type ReadQuestion struct {
ID int
Content string
IsDeleted bool
CreatedAt time.Time
HasAnswers bool
}

Notice that as far as some fields of ReadQuestion may be the same as of Question they have different purposes. ReadQuestion structure is optimised for querying. E.g. Question aggregate may contain an array of Answer entities. However, ReadQuestion has only boolean field HasAnswers because it provides enough information for further queries.

Figure 10. Persisting ReadQuestion after receiving an event

ReadQuestion is persisted in PostgreSQL as well but in the separate table — read_questions which has the following structure:

  • id — id of question
  • content — content of question
  • is_deleted — is question deleted?
  • has_answers — does question contain any answers?
  • created_at — date when the question was asked

Such form of storing ReadQuestion instances allows us to query them with simple and efficient SQL:

SELECT id, content FROM read_questions 
ORDER BY created_at DESC
LIMIT 10

ReadQuestion versioning & optimistic locking

We covered a lot of ground so far but there are still some nasty pitfalls worth mentioning. Our microservices rarely run as a single instance. Instead, they have at least three instances to distribute the load and provide redundancy. Unfortunately, it doesn’t come for free.

Once again, let’s see what will happen when QuestionCreated and QuestionDeleted events will be published shortly one after another. Assume that both events are about the question with ID 123. There is a big chance that they will be consumed by different instances of the feed microservice. Both instances will append consumed events to the event store and after that read all the events for question 123.

Let’s assume that there are no events for question 123 in the event store so far. Instance #1 receives QuestionCreated event and saves it in the event store. Next, it fetches all events for question 123 and receives only QuestionCreated. It recreates Question entity and creates ReadQuestion from it. But in the meantime, before instance #1 saves ReadQuestion into the read_questions table, instance #2 does the following:

It receives QuestionDeleted event and saves it to the events store. After fetching all events for question 123 it’ll receive both QuestionCreated and QuestionDeleted events. It’ll use them to recreate Question entity and ReadQuestion.

Figure 11. Race condition when both instances are consuming event and persisting ReadQuestion

Now, what will happen if instance #2 will save its ReadQuestion to the database before instance #1 does? We’ll end up with the inconsistent system because ReadQuestion saved by instance #1 didn’t include the QuestionDeleted event. Such inconsistency may last for a long time.

We used two mechanisms to solve this problem — versioned entities and optimistic locking. Versioned entities have one additional field — Version . It is a simple integer that is equal to the number of events that were used to recreate this entity. Thanks to that entity with the most up-to-date state will always have the biggest version. That’s a simple example of how it could be done in Go:

type Question struct {
ID int
Content string
IsDeleted bool
Version int
}

func (q *Question) applyEvent(e event.Event) error {
switch v := e.(type) {
case event.QuestionCreated:
q.ID = v.ID
q.Content = v.Content
case event.QuestionDeleted:
q.IsDeleted = true
default:
return fmt.Errorf("invalid event %T", e)
}
q.Version++
return nil
}

Version field was added to ReadQuestion structure as well, and its value is just copied from Question.

Next, when ReadQuestion is persisted we need to make sure that we’re not persisting ReadQuestion with version smaller than the version that already exists in the database. It is crucial to atomically check version and perform an update on a database. Otherwise, the race condition that we tried to prevent would still occur. In PostgreSQL we could achieve that with the following SQL statement:

INSERT INTO read_questions (id, content, version)
VALUES (3124, 'Question content', 2)
ON CONFLICT (id) DO UPDATE
SET content = 'Question content', version = 2
WHERE id = 3124 AND version < 2

In microservice logic, we check afterwards if any rows were changed. If no, then it means that we tried to persist ReadQuestion with a smaller version than already exists. In that case, OptimisticLockError is returned. This error is handled as we described in Consumer Error Handling section. So it will cause the whole process of consuming event to be retried. But it is ok because the event was persisted in event store before so it will be just discarded on the second retry as a duplicate.

Summary

Microservices and event-based communication provide a lot of benefits when it comes to the scalability of the teams and systems. Unfortunately, it doesn’t come for free, and attached costs may be severely underestimated.

In this article, I only scratched the surface of challenges that you’ll probably find when working with the distributed system. But I hope that you’ll find some of this solutions useful.

Please leave a comment if you have different experiences that you would like to share or if this article was somehow helpful for you.

--

--