Scaling Event-Sourcing at Jet

At Jet, we’ve been using event-sourcing since the very beginning and learned some lessons along the way. There are several dimensions along which we had to scale our event-sourcing platform. The one which most teams using event-sourcing have to overcome early on is scaling reads — as streams increase in size it becomes prohibitive to read the entire stream to perform an operation. Another dimension of scaling is redundancy — in order to function continuously, the platform needs to tolerate not only failures of individual machines in a data center, but failures of an entire data center. The projection system needs to be scaled to support a growing number of consumers with varying workloads. Meanwhile, as the number of moving parts increases, it becomes essential to verify safety and liveness guarantees advertised by the system. Of course in time, the benefits of a highly modular architecture afforded by event-sourcing start to weigh on our ability to obtain accurate pictures of system states. To that end, we need a tracing platform which, in addition to request-reply, must support tracing of asynchronous interactions. All things considered, the challenges of operating an event-sourcing platform are noteworthy, but its sound foundational principles continue to pay dividends as we evolve.

Event Sourcing

There are a few definitions of event sourcing floating around. Martin Fowler’s is perhaps the most cited one and it states that:

  • S∅ | — a starting state.
  • Input | — a set of inputs.
  • Output | — a set of outputs.
  • τ : State × Input State × Output | — a transition function.
Δ : State × Event  State |— defines how an event changes state.
ε : State × Input Event |— takes inputs to outputs at a state.
get : StreamId × SN → Events |— returns events in a stream.add : StreamId × SN × Events → Ack |— adds new events to a stream.
Figure 1: Event stream index.
log : LSN → Events |— returns all events in a partition.
Figure 2: Event log.

Scaling Reads

Recall that the get function, defined above, returns the events in a stream starting at a specified sequence number. Throughout the lifetime of a system, streams can get arbitrarily large, eventually making reads of the entire stream prohibitive during an operation. A common way to scale reads with event-sourcing is using a technique called a rolling snapshot. A snapshot captures the state of a stream at a particular point in time and is constructed using the delta function defined above. Then, only events occurring after this point in time need to be read in order to reconstitute the last known state.

Scaling Projections

In the context of event-sourcing, a projection refers to a running fold of events. In essence, a projection embodies a state-machine whose inputs are events from the log. Its output events may form another stream, or the projection may be used solely for computing a state. A projection may rely on state, or it may be stateless. One type of projection is a filter — it forms a stream of events matching a predicate. Another is a transformer — it either enriches events, or translates them into another form. Since projections are state-machines, they can also perform aggregations and joins.

π : State × EventState × Output

Geo-Replication

Another dimension of scaling is redundancy — continuous operation becomes increasingly critical as the business grows. Redundancy of storage systems is typically achieved using state-machine replication wherein data is replicated across a cluster, tolerating failures of some number of machines. It is quite common for database products to support clustering within a data center. It is much less common for database products to support clustering both within a datacenter and between data centers. This isn’t simply a matter of growing a cluster to include nodes across different data centers — the latency differences between a LAN and a WAN must be taken into account and reflected in the replication protocol.

Figure 3: Jet Event-Sourcing Platform

Consistency Verification

In an asynchronous system, independent parts operate independently, which also means they fail independently. The regional replication system and the projection system are both asynchronous systems, and we needed a way to monitor their consistency with respect to the upstream event store. We did this by building an out-of-band verification system, which would compare a log to a downstream system. One configuration of the system compares EventStore logs, and another compares an EventStore log to a Kafka topic. The system checks to make sure that:

  • That they’re transmitted in the correct order
  • And with an expected latency

Distributed Tracing

Understanding and debugging systems involving multiple nodes is difficult. Understanding and debugging systems involving multiple nodes and asynchronous interactions is even more difficult. As a result, distributed tracing in an event-sourced system is particularly important, and unlike many existing tracing platforms, it must support tracing of asynchronous interactions. The verification system described above provides a degree of confidence in the system. However, it doesn’t provide the level granularity and scope sufficient for all scenarios. For example, we may wish to inspect the handling of a particular external request across various systems. The verification system can tell us that all events are suitably replicated, but it doesn’t record information about particular traces. A trace is a collection of events associated with a key, and the events denote domain-specific system state changes. The trace key is a unique id, typically generated at a system boundary, and propagates across communication mediums in accordance with the tracing protocol. The tracing system collects and indexes tracing events.

Ongoing Work

  • Cool Storage — while we’ve scaled reads as described above, the issue of ever-growing streams remains. A cool storage mechanism archives older events into cheaper storage mediums.
  • Projections using Azure functions — the projection system can reference Azure functions to support execution of arbitrary logic. While care must be taken to ensure that the resulting system is well-behaved, we can expand the scope to allow declarative definitions of microservices
  • Event-Sourcing Engine — while we’ve gotten quite far with EventStore, we’ve set out to build a replacement event-sourcing data store to continue to meet our scaling demands. With this data store, we’re looking to have built-in support for geo-replication.
  • Causally-consistent Geo-replication — as noted above, geo-replication is asynchronous and therefore susceptible to data loss. For some operations, we would like to synchronously replicate events before acknowledgement. This would provide causal consistency with respect to individual streams across regions.

Conclusion

Event-sourcing is founded on sound principles, and while there certainly are challenges to building such a platform — as evidenced herein — the benefits outweigh the risks. A notable benefit for the systems engineer is the stability of the architecture —it is possible to scale individual components without changing the core. Teams can build their systems autonomously, but also integrate seamlessly when required. From a theoretical standpoint, the log at the heart of event-sourcing allows disparate components to reach consensus in a non-blocking manner. Moving forward, we will continue to enhance the event-sourcing platform to continue meeting the demands of a world-class shopping experience at Jet.

Acknowledgements

The event-sourcing platform was made possible by efforts of many individuals across several teams at Jet.

We’re Hiring

We’re hiring — if you’d like to get involved in some of these efforts, reach out to Jet Technology Careers or to me directly.

References

interested in: distributed systems, geometry, algebra, logic, leadership, economics, machine learning, game theory, philosophy, music