Scaling Event-Sourcing at Jet

At Jet, we’ve been using event-sourcing since the very beginning and learned some lessons along the way. There are several dimensions along which we had to scale our event-sourcing platform. The one which most teams using event-sourcing have to overcome early on is scaling reads — as streams increase in size it becomes prohibitive to read the entire stream to perform an operation. Another dimension of scaling is redundancy — in order to function continuously, the platform needs to tolerate not only failures of individual machines in a data center, but failures of an entire data center. The projection system needs to be scaled to support a growing number of consumers with varying workloads. Meanwhile, as the number of moving parts increases, it becomes essential to verify safety and liveness guarantees advertised by the system. Of course in time, the benefits of a highly modular architecture afforded by event-sourcing start to weigh on our ability to obtain accurate pictures of system states. To that end, we need a tracing platform which, in addition to request-reply, must support tracing of asynchronous interactions. All things considered, the challenges of operating an event-sourcing platform are noteworthy, but its sound foundational principles continue to pay dividends as we evolve.

Event Sourcing

There are a few definitions of event sourcing floating around. Martin Fowler’s is perhaps the most cited one and it states that:

Event sourcing is a paradigm where changes to application state are recorded as a series of events.

To make this more concrete, it is helpful to model applications using IO automata. An IO automaton is defined as a set of states, a special starting state, a set of input events, a set of output events and a transition function taking pairs of state and input event to pairs of state and output event:

  • State | — a set of states.
  • S∅ | — a starting state.
  • Input | — a set of inputs.
  • Output | — a set of outputs.
  • τ : State × Input State × Output | — a transition function.

The hosting service manages state as well as interaction with input and output mediums. During an operation, the service receives an input event from the input medium, retrieves state corresponding to the input, executes the transition function, persists the resulting state and sends the output event to the output medium. A service typically manages multiple state machines concurrently— one for each entity (aggregate) in the system.

For example, consider the shopping cart system. State correspond to states of individual shopping carts, consisting of a list of items, prices and promo code information. Input corresponds to requests to perform actions on the cart, such as adding items, or checking out. Output corresponds to changes in the cart, such as items being added or removed. Finally, the transition function τ encodes the logic for handling requests on a given a cart.

Event-sourcing makes the observation that rather than persisting state, we can persist the output events. An instance of state can be reconstituted by running a fold over past outputs using a delta function:

Δ : State × Event  State |— defines how an event changes state.

We assign sequence numbers to output events and define the sequence number of an instance of state as the sequence number of the last event used to derive it. The transition function defined above can be factored into a delta function and an execute function:

ε : State × Input Event |— takes inputs to outputs at a state.

In order to run an event-sourced service, we need a storage mechanism that can store event streams for each entity in the system. These capabilities can be summarized as follows:

get : StreamId × SN → Events |— returns events in a stream.
add : StreamId × SN × Events → Ack |— adds new events to a stream.

The get operation returns the set of events in a stream starting at the specified sequence number SN. The add operation appends a set of events to a stream at a specified sequence number. If the sequence number does not match the stream an error is returned — optimistic concurrency control.

Figure 1: Event stream index.

In addition, the event store should also provide access to the log of all events in a collection:

log : LSN → Events |— returns all events in a partition.

LSN herein refers to a logical point in the log of all events in a collection, and it may be a sequence number, or a more complex structure such as a vector if the log is partitioned. The log enables service orchestration — downstream services can perform operations in response to events in an upstream system, or the upstream system itself can be replicated — state-machine replication. Moreover, the log allows for communication to be consistent with respect to the state — events are used to reconstitute state and notify downstream systems of changes in the upstream system. Without a log, care must be taken to prevent missed communications, or communications with respect to uncommitted states.

Figure 2: Event log.

The collection of streams depicted in Figure 1 is an index of the log depicted in Figure 2.

One data store that has these capabilities is EventStore and it is used for many systems at Jet. With this definition of event-sourcing in mind, we can characterize the different ways that we’ve scaled our event-sourcing platform.

Scaling Reads

Recall that the get function, defined above, returns the events in a stream starting at a specified sequence number. Throughout the lifetime of a system, streams can get arbitrarily large, eventually making reads of the entire stream prohibitive during an operation. A common way to scale reads with event-sourcing is using a technique called a rolling snapshot. A snapshot captures the state of a stream at a particular point in time and is constructed using the delta function defined above. Then, only events occurring after this point in time need to be read in order to reconstitute the last known state.

Snapshotting is not to be confused with distributed snapshots using a snapshot algorithm— a different, albeit somewhat related notion. A snapshot algorithm approximates a global state of a distributed system, and is most often used for asserting stable properties, such as termination or deadlock.

Snapshots can be managed in a few different ways. A service can persist a snapshot when performing operations. Alternatively, snapshots can be generated by a downstream services consuming the log. Snapshots can be generated for every event, or based on an interval. Snapshots can be stored in another stream, or in an entirely different data store. (Snapshots can also be stored alongside events in a stream, however this requires an ability to read streams backwards, couples the snapshotting interval to the read access pattern, and interleaves state — a particular interpretation of events — with the events themselves). Before an operation is performed, a snapshot can be read, followed by a read of any remaining events to reconstitute the latest state. Alternatively, the operation can be performed speculatively with respect to the retrieved state, relying on the optimistic control of the event-store to ensure consistency of the underlying stream. By regarding state snapshots as an optimization mechanism rather than a core storage pattern we had flexibility in terms of how reads could be scaled, all while retaining the entire history of events.

Scaling Projections

In the context of event-sourcing, a projection refers to a running fold of events. In essence, a projection embodies a state-machine whose inputs are events from the log. Its output events may form another stream, or the projection may be used solely for computing a state. A projection may rely on state, or it may be stateless. One type of projection is a filter — it forms a stream of events matching a predicate. Another is a transformer — it either enriches events, or translates them into another form. Since projections are state-machines, they can also perform aggregations and joins.

π : State × EventState × Output

The EventStore projection system is quite handy and has several built-in filters, such as the stream prefix filter just described, an event-type filter and a projection running custom JavaScript. An issue with EventStore projections is that they haven’t worked well on a cluster. As such, the first step to scaling the projection system was running projections on a replica EventStore instance, downstream from the cluster. This instance could run as a single node and its sole purpose would be to generate and distribute projections. An asynchronous replication service would consume the log to populate the projection node.

Another issue using EventStore for projections is that its log isn’t partitioned, and as such, the single reader thread becomes a bottleneck. To scale the projection system, we introduced Kafka as the distribution medium. A service executes projection state machines, and emits outputs to Kafka topics. This service can run filtering projections as defined above, but it can also run more complex transformations. For example, a projection can be defined to translate between internal and external contracts of a system. Stream snapshots can also be computed using a projection.

Kafka serves well as a distribution medium, however we don’t rely on it as a source of truth. The projected topics have a retention policy, and architecturally, the projection system is designed to tolerate failures in the Kafka cluster, either by reading the upstream event store, awaiting rehydration of the cluster or failing over to another region as described next.

Geo-Replication

Another dimension of scaling is redundancy — continuous operation becomes increasingly critical as the business grows. Redundancy of storage systems is typically achieved using state-machine replication wherein data is replicated across a cluster, tolerating failures of some number of machines. It is quite common for database products to support clustering within a data center. It is much less common for database products to support clustering both within a datacenter and between data centers. This isn’t simply a matter of growing a cluster to include nodes across different data centers — the latency differences between a LAN and a WAN must be taken into account and reflected in the replication protocol.

EventStore runs as a cluster using a synchronous replication protocol which ensures consistency, or more precisely linearizability, among a quorum of nodes. A clustered mode of operation is essential in a cloud environment where individual VMs are routinely restarted for maintenance. EventStore however does not support cross datacenter clustering, which we’ve had implement as a bolt-on component. Since EventStore exposes the underlying log, this was possible.

The bolt-on design augmented a single-region system with asynchronous cross-region replication. Since cross-region replication is asynchronous, there is a possibility of data loss, which was taken as acceptable in during regional failures. However, the regional failover and fail-back processes still need to take the system through consistent states.

Consider a system with two regions — a primary and a secondary. Each region contains a cluster, and there is an asynchronous replication channel from the primary to the secondary. The primary region accepts all writes. The secondary region can’t accept writes — this would result in conflicts — but its log can be consumed by downstream systems, including a projection system. During a failure in the primary region, the secondary region can be turned into a primary, re-routing all writes to it. At this point, the system can continue to operate, though possibly in a compromised state. For example, a failure to the secondary region cannot be tolerated. Moreover, some downstream systems may only operate in the primary region and must therefore await its recovery.

In order to fail-back and recover the primary region the asynchronous replication channel must be reversed and directed into a suitable replica. The logs between the primary and secondary regions may have diverged and conflicts would result if replication is reversed into the original primary. A suitable replica can be obtained by restoring a backup of the secondary region in the primary region and then reversing the asynchronous replication channel.

A more graceful way to achieve the fail-back is to extend the chain to replicate from the secondary region back to the primary region, but into a 3rd replica cluster. This makes it possible to fail-back to the 3rd replica in the primary region — turning the tail into a head. Meanwhile, a new tail can be bootstrapped resulting in a continuous rotation of the chain. This design provides a tradeoff between the costs of operating a 3rd cluster and recovery time.

In essence, this design is akin to chain replication. In chain replication, nodes are organized into linearly ordered chains, wherein a head node accepts writes, which are propagated across the chain, the last node of which is the tail node. Reads can be served by any node in the chain, depending on recency and availability needs. Reads in our case are reads of the log performed by the projection system.

The following diagram depicts the architecture of the Jet event-sourcing platform:

Figure 3: Jet Event-Sourcing Platform

The diagram illustrates the chain paradigm described earlier. The head of the chain is in the primary region and it accepts all writes and reads used for writes. The secondary region hosts the middle of the chain, and an F# service asynchronously replicates evens from the head. A third replica is again hosted in the primary region. The projection system is situated downstream from a replica in each region, and because it is asynchronous by nature, it doesn’t need to consume the log of the head node. In the diagram, the projection system emits outputs to Kafka, however it can just as well emit outputs to another system. Moreover, we can rely on Kafka’s streaming component to form downstream systems.

A nice property of this architecture is that it allows downstream systems to inherit geo-replication capabilities from the event store. For example, Kafka is not a geo-replicated system, and while tools exist to make it so, it is much easier to reason about the system if the source-of-truth itself is geo-replicated. Moreover, rather than geo-replicating each downstream component using its own replication mechanism, all components can piggy back on a single platform. In addition to Kafka, we’ve used this approach to add geo-replication to several other systems, including SQL databases, ElasticSearch clusters, Redis caches, etc.

Consistency Verification

In an asynchronous system, independent parts operate independently, which also means they fail independently. The regional replication system and the projection system are both asynchronous systems, and we needed a way to monitor their consistency with respect to the upstream event store. We did this by building an out-of-band verification system, which would compare a log to a downstream system. One configuration of the system compares EventStore logs, and another compares an EventStore log to a Kafka topic. The system checks to make sure that:

  • All applicable events are transmitted
  • That they’re transmitted in the correct order
  • And with an expected latency

This verification system helped us find bugs in our bolt-on projection and replication systems, in EventStore as well as our Kafka client Kafunk. Moreover, it provides continuous monitoring of safety and liveness properties.

Distributed Tracing

Understanding and debugging systems involving multiple nodes is difficult. Understanding and debugging systems involving multiple nodes and asynchronous interactions is even more difficult. As a result, distributed tracing in an event-sourced system is particularly important, and unlike many existing tracing platforms, it must support tracing of asynchronous interactions. The verification system described above provides a degree of confidence in the system. However, it doesn’t provide the level granularity and scope sufficient for all scenarios. For example, we may wish to inspect the handling of a particular external request across various systems. The verification system can tell us that all events are suitably replicated, but it doesn’t record information about particular traces. A trace is a collection of events associated with a key, and the events denote domain-specific system state changes. The trace key is a unique id, typically generated at a system boundary, and propagates across communication mediums in accordance with the tracing protocol. The tracing system collects and indexes tracing events.

Ongoing Work

  • Cool Storage — while we’ve scaled reads as described above, the issue of ever-growing streams remains. A cool storage mechanism archives older events into cheaper storage mediums.
  • Projections using Azure functions — the projection system can reference Azure functions to support execution of arbitrary logic. While care must be taken to ensure that the resulting system is well-behaved, we can expand the scope to allow declarative definitions of microservices
  • Event-Sourcing Engine — while we’ve gotten quite far with EventStore, we’ve set out to build a replacement event-sourcing data store to continue to meet our scaling demands. With this data store, we’re looking to have built-in support for geo-replication.
  • Causally-consistent Geo-replication — as noted above, geo-replication is asynchronous and therefore susceptible to data loss. For some operations, we would like to synchronously replicate events before acknowledgement. This would provide causal consistency with respect to individual streams across regions.

Conclusion

Event-sourcing is founded on sound principles, and while there certainly are challenges to building such a platform — as evidenced herein — the benefits outweigh the risks. A notable benefit for the systems engineer is the stability of the architecture —it is possible to scale individual components without changing the core. Teams can build their systems autonomously, but also integrate seamlessly when required. From a theoretical standpoint, the log at the heart of event-sourcing allows disparate components to reach consensus in a non-blocking manner. Moving forward, we will continue to enhance the event-sourcing platform to continue meeting the demands of a world-class shopping experience at Jet.

Acknowledgements

The event-sourcing platform was made possible by efforts of many individuals across several teams at Jet.

Contributors: Cole Dutcher, Andrew Duch, Erich Ess, Lev Gorodinski, Scott Havens, Mike Hanrahan, Gina Maini, Brian Mitchell, John Turek, Ido Samuelson.

We’re Hiring

We’re hiring — if you’d like to get involved in some of these efforts, reach out to Jet Technology Careers or to me directly.

References