Event Sourcing Benefits Without the Eventual Consistency
With the examples we’ve explored here, we have seen that there is a fairly robust pattern which we can use to acquire the beneficial features of event sourcing patterns, while mitigating the potential disasters of eventual consistency normally inherent to event sourcing, all without relying on distributed locking mechanisms. The trade-offs made give us many benefits, with a very minimal overhead.
the allure of event sourcing
In the world of systems architecture, we want to be able to build systems which can scale. In order to future-proof a system and to ensure that we can add new functionality, new services and new features in a straightforward fashion, we may often turn to event sourcing. Event sourcing is great because events which would mutate the system in some way are preserved in a serial event log and new services coming online can consume and utilize this data in ways that you perhaps did not imagine in the beginning.
This is pretty compelling, but as many folks may know, event sourcing isn’t a silver bullet. One of the biggest issues which needs to be squared with in terms of architecture is the eventual consistency which is introduced by the event stream. The stream itself is typically not a queryable data store, and the stream consumers which are responsible for materializing the stream’s data in a way that is meaningful, typically have some nondeterministic latency involved in their stream processing.
This latency could be due to transient network errors, hardware outages, slow consumers, or the like. There are many disaster scenarios which can take place, and these sorts of scenarios will influence the time to consistency for the system. The happy path is: at some point in the future, after an event has been written to the event stream, a consumer will pick up the event and materialize it, say, in your MongoDB database, which a microservice may use for showing data in your UI. The key phrase being “at some point in the future”.
Consider the following scenario. A data mutation event comes into the system, passes some preliminary validation, and is then written to the stream. Now, before your stream consumer is able to process the event, your k8s (Kubernetes) cluster suffers a critical set of node failures, and your consumer pods are now all offline. The k8s scheduler will attempt to reschedule those pods on a healthy node, but what if there is no space on the remaining healthy nodes. This is an outage scenario. There are ways to avoid this of course, but let’s say you did not avoid it this time.
In such a scenario, the user which submitted the data mutation may see a timeout, and may try to submit the mutation again. This is a problem, as now we can no longer trust the veracity of the data on the stream, and we will need to do validation as part of the consuming / materialization phase. What about the new services which come online in the future? How will they know about these validation issues and that they need to discard the duplicate mutation?
We could simply write a follow-up message to a new stream which represents events which have been successfully materialized, but this defeats some of the purpose of event sourcing to begin with, as we will now have to account for errors which may come up after the data was materialized but before the follow-up event was written. Do we retry the materialization process again and then write a new follow-up event? What if the error actually took place after materialization and after the follow-up event was written but before the message was ack’ed? Now we would be facing a duplicate message in the follow-up stream. Tough.
Here, I would like to propose an alternative. What if we have a database like MongoDB or RethinkDB which supports the concept of change streams? We could do all of our validation on incoming mutations/requests immediately on the database which is the source of truth for the respective domain, write the data to the database, and then have a change stream system setup which will populate an event sourcing stream thereafter based on the changes to the database.
This has a nice appeal to it, as we get to remove the difficulty of eventual consistency in respect to the event stream and its consumers, and we would be able to populate an event stream with data that is authoritative and trusted because the events have already taken place, been validated, and are material. This is certainly not the canonical way of thinking about event sourcing, but with this pattern the database which supports change streams could be thought of as the initial node in your data stream graph.
Any problems here? Well, let’s take a look. If your change stream consumer is running as part of the domain’s microservice instances, then you need to coordinate which instance should be responsible for consuming. If you have all
n replicas of your microservice consuming the change stream, then you will get
n duplicates of the event under normal circumstances. If you go with this approach, then you will need to use a distributed locking mechanism. Something like etcd’s leases may work well. You can build a pretty strong abstraction on top of leasing out locks to determine which instance should do the consuming. The problem? Distributed locks themselves. Anyone with experience using distributed locking across services is probably well acquainted with the error conditions which may come up. Take the following example.
The lock holder may miss a series of heartbeats due to a bug in its algorithm or networking issues, and then the etcd lease expires. The other instances will be watching the etcd key, will notice the released lock and will attempt to acquire the lock. One will succeed and start from the last change stream checkpoint. A critical problem may arise if the original lock holder does not stop consuming the change stream in time. This may lead to duplicates in the event stream which is supposed to be authoritative. Things can get worse from there.
So, how can we deal with this? An approach may be to create a standalone consumer which just handles the change stream logic. We could enforce that only one replica of the consumer is ever active. We can use k8s deployment strategy
Recreate to bring the one instance down before bringing a new copy online. It is a background consumer, so this should be acceptable. There are still a few issues to consider with this example.
- How much should we trust the orchestration platform to ensure that only one replica ever exists? If there are errors here and multiple nodes come online, we could get duplicates.
- How can we scale a pattern like this? A few approaches could be taken, but they are application level approaches. Horizontal scaling is not an option because then we are back at the distributed locking issue which defeats the purpose.
Let’s break these issues down.
trust the orchestrator?
As far as the orchestrator is concerned, let’s use k8s (Kubernetes) in this context, there is actually a fairly decent amount of trust which can be given on this front. First off, when using a k8s deployment, you can state explicitly that only 1 replica is to be started. The next thing to take care of is to ensure that the deployment strategy is set to
Recreate instead of the default
RollingUpdate . This will ensure that any of the deployment’s replicas are completely destroyed and removed before starting any new replicas.
Ok, this is good. Anything else we need to be concerned about in our state of professional paranoia? Yes. Developer mistakes. To be sure, this can be a bit of a rabbit hole. Any developer, at any point in time, has the potential of introducing some number of bugs which could cause our desired constraints to be violated. Test code could be faulty, which causes a business logic bug to go unnoticed. These sorts of things happen. How do we strike a reasonable balance? You will come across many different opinions on this front, but at the end of the day, some amount of trust is needed here.
Perhaps a simple CI/CD test can be used to ensure that our deployment file has the number of desired replicas always set to 1, and that the update strategy is set to
Recreate. Something like this — coupled with a comment or two in your deployment file and some reasonable peer review — should do the trick.
Other measures could be taken. etcd based distributed locks to ensure only one active replica, synchronization documents in a database &c. There are plenty of options. The trade-off? More code, more potential issues. Perhaps trust coupled with some tests and peer review is the best balance. YMMV. However, it may be more simple than this. Change stream events have unique IDs, which we can leverage quite nicely. Read on.
We are now left with one major concern. How do we scale this pattern? Horizontal scaling is no longer an option, so what can we do to maximize our throughput? First of all, in the context of read-process-write workflows, it is illogical to have duplicate consumers, so there is no reason to get bent out of shape over the constraint of not being able to horizontally scale this pattern. So let’s not index into that too deeply. Let’s focus on the pattern of scaling the process which reads a change stream / changeset from a database and uses that to generate events to be written to the persistent stream.
First, here are the semantics we adhere to when using MongoDB change streams for this pattern.
- When starting from the beginning, you open a change stream, and for each event observed (regardless of whether it was processed or skipped), you persist the resume token in some location where it can be accessed later.
- Any time the service goes down — due to updates, hardware failures, or the like — the service will open the change stream using the last persisted resume token.
In order to scale this pattern, you can create multiple change streams based on each collection in MongoDB which you care about, or you could dice it up even further my using the aggregation system to only see the events/updates which the specific consumer is concerned about, and other consumers would watch for other changes on the same collection. This is very application specific, and each team will have to take into consideration what will work best for their use case.
An astute reader will see that there is still an error condition here. After successfully writing the change stream event to the persistent stream, writing the checkpoint resume token may fail. This may lead to a duplicate event depending on the nature of the failure. What do we do about this?
Fortunately change stream events all have a unique ID. This ID can be used to guard against the more rare condition where a duplicate may come through due to the aforementioned failure scenario. Downstream consumers of the persistent stream could use the unique ID of the events for deduplication. The duplicates will be identical, to be sure, as they will have come from the MongoDB change stream. So as long as the processing is idempotent, we could ignore this, but tracking the unique IDs is simple enough. You could put a TTL index on the collection being used to track seen events, which would ensure that there won’t be any long-term data overhead as part of the consumer pattern. Lots of options here.
At the end of the day, dealing with an unending variety of failure scenarios can feel defeating. However, error handling is our responsibility as builders, and it would seem that the balance exists somewhere in between too much abstraction and too little. Too much, and we will have introduced more error conditions. Too little, and errors will go unchecked leading to an unstable system. As always, it is a matter of trade-offs.
With the examples we’ve explored here, we have seen that there is a fairly robust pattern which we can use to acquire the beneficial features of event sourcing, while mitigating the potential disasters of eventual consistency normally inherent to event sourcing, all without relying on distributed locking mechanisms. The trade-offs made give us many benefits, with a very minimal overhead.
Over the years, I have used many different patterns. Some I’ve inherited, some I’ve created. I am always interested to hear feedback on these sorts of things. Please share your experiences. Let me know what you think over here on Reddit. Also, please stay calm. These sorts of issues can be complex. No one needs to get angry. We can accomplish much more with respectful communication.
Live long and prosper. 🖖