Rebuilding the Segment Leaderboards Infrastructure — Part 2: First Principles of a New System
Over the past year, the Strava platform team has been working to rebuild the segment leaderboards system. This is the second in a series of four blog posts detailing that process. It aims to examine the goals and first principles of the new leaderboards system, identifying properties, themes, and other design goals. For some added context, please read part one, which details the history of the leaderboards system at Strava. For the next article in the “ Rebuilding the Segment Leaderboards Infrastructure” series, check out Part 3: Design of the New System.
The Switch to Stream Processing
In part one of this series, we identified two problems with the existing leaderboards system:
- Redis was a problematic choice for storage.
- Synchronizing leaderboard writes of an RPC system is hard.
Let’s set aside discussing Redis and storage technologies for a moment, and turn our attention to the issues of synchronization of writes. As a refresher, when a Strava user completes an activity (bike ride, swim, run, etc) they upload their activity data file to our system for processing. For activities which have GPS data associated with them, we analyze that GPS data and detect when the activity traversed a Segment. For each matched segment, we determine the time it took to traverse that segment and record a segment “effort” for it.
Ordering is Critically Important
While new effort creations are the majority of change operations to our leaderboards system, they are not the whole story. We also allow users to remove their efforts from leaderboards, i.e. when an athlete decides to make an activity private, removing it from their public feed.
The order in which effort additions and removals are processed by the leaderboards system is very important. Imagine we have a sequence of [add, remove, add] for an effort. Perhaps the user uploaded an activity, made it private, then changed their mind and made it public again. The correct outcome from applying those updates, in order, is that the effort should exist on the leaderboard. However, if we process the messages out of order, say [add, add, remove], the effort is removed from leaderboards. This is obviously incorrect.
This may seem like a simple problem to solve, but ordering with most RPC systems can be less strict than needed. With the old Scala-powered RPC leaderboards system, ordering was very loosely defined; best described as “the order in which requests were processed.”
This loose ordering existed for a few reasons:
- Requests from our Ruby on Rails app may be sent out of order from when they actually occurred. RPC calls are triggered as part of Rails after_create hooks, so, going back to the [add, remove, add] example above, perhaps the background worker processing the remove encountered some latency, which allowed the second add request to complete first.
- Even if ordering of RPC calls sent by Rails is correct, there is no guarantee that the requests are actually processed by the server in the order they are received. The Scala-powered application server handling the remove could encounter a GC pause, or lock contention, and allow the second add to process first, with the remove processing second.
- If an individual request RPC request fails, we cannot safely retry it. For example, if the remove in the [add, remove, add] sequence fails, blindly retrying it will result in it being incorrectly applied last.
It was in light of these concerns that we decided to switch the leaderboards system from an RPC-based system to a stream processing-based one. As efforts were added and removed, the system would simply publish a “effort mutation” message describing the operation to a stream. Downstream consumers would then consume that stream, in order, applying the updates to leaderboards.
Astute readers my notice this is very similar to a database transaction log. It is, however due to some implementation details, we had to implement some extra processing to produce a correctly ordered stream of add and remove messages. We will discuss this more in the third post of the series.
Moving down the pipeline, the consumer of effort mutations has the responsibility of turning each mutated effort into a potential write to a leaderboard. Its logic is simply:
- query each leaderboard the effort could appear on, for the athlete’s best effort
- if the new effort is faster than the existing effort, replace it. Otherwise, do nothing.
There is one glaring problem however: the race condition in between step #1 and #2. After we’ve read our best effort for a particular athlete on a particular leaderboard, the application is then comparing the read effort to the incoming effort mutation and making a business logic decision on what to do with that information. If, in between the read and write, a 3rd party actor mutates the data we just read — perhaps another worker updated the leaderboard with an even faster effort for the athlete — our read is now dirty, and we cannot safely make our business logic decision or issue a write.
We need some way to synchronize writes, ensuring no other writers conflict with our read from step #1, before we issue our write in step #2. One trivial way to do that is to only run one worker, which processes the entire stream sequentially one effort at a time. In that model we can guarantee that no other write will execute in between steps 1 and 2, because there are no other writers. Unfortunately, while this solution is relatively straightforward, it’s not scalable. Even the moderate rate of 100 efforts/second with 10ms of processing time per effort would not able to keep up with the rate of new efforts being added. Thus, we must add more processing power by doing work in parallel.
Unfortunately, once work is occurring in parallel, we’re back to our problem of having to synchronize writes. The old Scala-powered system synchronized its writes using locks around every write request. But as part one of this series pointed out, locks add write latency, computational overhead, and are hard to get right. For these reasons, we wanted to shy away from using locks and seek a simpler alternative.
Additionally, with work happening in parallel, we’ve also lost the absolute ordering of effort mutations. If the stream contains [add, remove, add], each message could be processed in parallel, with the end leaderboard state being dependent on which order the messages were processed in.
Thankfully, all is not lost. The crucial piece of insight in this situation is that it’s not necessarily important to order and synchronize all effort mutations. It’s actually only important to synchronize mutations within a particular segment/athlete combination. You can imagine that on any particular leaderboard, any particular athlete only gets one slot — their best time. The system only needs to ensure all mutations for that slot — for that particular segment/athlete combination — are processed sequentially, in order to guarantee consistency. Writes to other slots on that leaderboard can happen in parallel, as they have independent consistency from the slot being written.
Thus, if we can partition the stream of all effort mutations by the segment/athlete combination, and ensure the total order of effort mutations within that partition, we can maintain consistency by processing a partition with only one worker. We also gain back parallelism, as each partition can be processed in parallel by the worker consuming that partition. All without locks or other synchronization mechanisms!
One wrinkle with this approach is independent workers writing to the same leaderboard in parallel. With each update, the leaderboard will change the ranking (ordering) of some portion of the efforts on that leaderboard. For example, if I am ranked #5 on the leaderboard, and I improve on my time to be ranked #3, the previous #3 is now #4 and the previous #4 is now #5. To handle these ranking updates, the leaderboards store will likely need to maintain this ordering, or we’ll need to be prepared to fetch all efforts from the leaderboard and order them in memory, something we explicitly wanted to avoid.
Most distributed systems guarantee some level of message delivery: at-most-once, at-least-once, and exactly-once. Generally speaking, exactly-once is very hard to achieve (some say impossible). So really you have a choice between at-most-once (does not guarantee delivery) and at-least-once (does guarantee delivery, with possible duplicates). Since most of the time we want to guarantee delivery, our system has to operate in a world of at-least-once delivery of messages.
Within the at-least-once world, any consumer of messages must be prepared to receive a message more than once. Consequently, all processing logic must be idempotent. Applying this back to our leaderboards system, it is therefore desirable that for any given effort mutation, we can guarantee that processing the message more than once is safe. The old Scala-powered system had this property — i.e adding the same effort more than once would not result in a corrupted leaderboard, such as a single effort showing up twice on one leaderboard. With a new stream processing system, idempotency will continue to be very important.
With our properties of stream processing in mind, it’s a good time to think about the data storage system we’re going to use to store efforts. As part one of this series pointed out, it’s infeasible to try to keep all leaderboard data in memory, so we must store it on disk. We would like to replicate our data for redundancy, and for increased read and write performance. We’d like to ensure high availability in the event of hardware or other kinds of failure.
There are a multitude of databases available on the market which can fulfill these needs. We will get into which one we chose and why in part three of the series.
Structure of data stored
If you remember back to part one, per segment in Redis we maintained two sorted data structures (sorted sets) for the overall and female leaderboards, with a Redis hash holding all other efforts for all other leaderboards for that segment. That hash was keyed by athlete, and values were a custom Ruby binary data format.
The consequence of this structure is that common leaderboard requests (paginated sequences of results, or an athlete’s rank on a particular leaderboard) are very fast for the overall and female leaderboards, but very slow for all others. For all other leaderboards, the entire hash has to be fetched and parsed in memory by the application server.
As leaderboards get larger and Strava gets more popular, having to fetch all possible efforts for all athletes, simply for common leaderboard requests, is infeasible. With the new data system, we want a storage technology that allows us to store each leaderboard independently, and in the same data structure — ideally one approaching the query performance of a Redis sorted set.
Achieving this will likely require denormalizing each leaderboard, which avoids the need ever scan all efforts to build a leaderboard in memory, but does have the downside of increased disk usage through data duplication. Disk is cheap, however, and with the homogenous structure we’re like to see predictable query performance as leaderboards grow in size. Independent storage of leaderboards also allows us to remove usage of the custom Ruby binary format, and replace it with a simple more portable serialization of the effort structure.
The final property we wanted from our system was fast writes. At our highest peak, on weekend afternoons, we’d be pushing ~2,500 updates to leaderboards per second. Any data store which is going to hold all denormalized leaderboards will need to be able to handle that volume, with the ability to add more capacity as Strava grows.
At such a high volume of writes, we’d almost certainly be looking at some kind of data store which is a distributed system, operating under an eventual consistency model. This is a change from the old Redis-backed approach, which maintained strong consistency after every write. Such strong consistency is a desirable property for sure, but it’s incredibly challenging to achieve under such a high write load, especially in a distributed system. Relaxing the consistency requirement, even slightly, generally allows write performance to improve, at the cost of potentially more expensive reads (since we may need to resolve conflicts).
In planning for the new leaderboards system, we identified that there are 3 important properties of an effort processing system:
- Ordering is crucial. As users create, update, and delete efforts, the order those effort mutations are processed in is critical to accurately maintaining segment leaderboards. Any new system we create must have strong ordering guarantees.
- By partitioning effort mutation updates, we can more easily synchronize leaderboards writes. The easiest way to avoid race conditions when applying updates is to serialize the updates. However, this is hard to do for all updates in a large data set occurring in parallel without some centralized coordinator. If we divide our dataset into separate and independent partitions, we are able to serialize updates for each partition, and as a result process all partitions in parallel.
- The system must strive for idempotency. It’s a certainty that we will process the same message multiple times, and so we will want our system to produce consistent data in that case.
Beyond the effort processing system, we also identified some characteristics of the data storage layer which will be important:
- The data store is likely to be distributed and eventually consistent. There is simply too much data and too high of an availability requirement for us to use any other style of database technology.
- We want a system which can store all leaderboards in a homogenous way, which is performant for common leaderboard queries. This will yield overall faster and more consistent query performance, less tied to the size of our data.
- The system must be able to do fast writes. Segment leaderboards are a write-heavy system, so we must be able to accept a high volume of writes, acknowledging this comes at the expense of slower reads which may have to resolve inconsistencies in the data.
Armed with these properties, our next blog post in the series will document our actual solution to this problem. We’ll talk about our technology choices for stream processing and data storage, what tradeoffs we made, and how we implemented a segment leaderboards system to meet these criteria.
For the next article in the “ Rebuilding the Segment Leaderboards Infrastructure” series, check out Part 3: Design of the New System.