Strategies Used at Box to Protect MySQL at Scale

Priyanka Reddy
Box Tech Blog
Published in
15 min readOct 7, 2020
Illustrated by Jeremy Nguyen / Art directed by Sarah Kislak

This blog has been adapted from a talk at Percona Live Online 2020: Strategies Used at Box to Protect #MySQL at Scale

The data access infrastructure at Box is responsible for storing and providing access to metadata associated with all content stored in Box and the relationships between them. For example, if Maya stores her file, “Q3Review.ppt” in folder “Presentations”, the data access infrastructure will store records indicating that:

  • there is a user named “Maya”
  • there is a folder named “Presentations” that is owned by “Maya”
  • there is a file named “Q3Review.ppt” whose parent folder is “Presentations”

Although the example above shows a small subset of the metadata we actually store, it should be sufficient to demonstrate why the data access infrastructure is a critical part of rendering almost any page on the Box application. Due to its criticality, the data access infrastructure must achieve the following guarantees: strict data consistency, high throughput, low latency and high availability. Let’s walk through what each of these mean at the data access layer.

  • Strict Data Consistency: Given the type of information that Box is providing to our security and compliance-minded customers, returning partial or stale data is not acceptable. Permissions calculations, as an example, can’t be delayed.
  • High Throughput: The data access layer must support whatever scale is being thrown at it by the application that’s being used by our 97K customers. It currently serves millions of requests per second, which translates to tens of millions of database records per second that need to be retrieved or modified. And it’s constantly growing.
  • Low Latency: Most of our requests complete in a few milliseconds. A single customer action requires tens of requests to the data access layer, so any small increase in latency at the data access layer is amplified to the customer.
  • High Availability: An outage at the data access layer is an outage for Box, plain and simple, making availability one of our top priorities.

While it’s relatively straightforward to achieve one or two of these guarantees while sacrificing the others, achieving all of them simultaneously is where all the complexity (and all the fun!) comes from.

Over the past decade, the data access infrastructure at Box has changed and grown tremendously. Starting as most web applications do with a single web server and a single database, the data access infrastructure of today is made up of thousands of machines, holds hundreds of billions of records and serves millions of requests per second. The journey to our current architecture was made up of many steps and I’ll take you through a few of the most interesting ones.

As the Box application started receiving more traffic than a single PHP web server and MySQL database could handle, the data access infrastructure was scaled up by adding more web servers, database replicas and caches, and by horizontally sharding the database. By that point, we had also built up a substantial data access layer in the web servers that held all of the logic for reading from our data access stores.

Adding a Data Access Service

Our next step was to add a data access service. The immediate problem that motivated the addition of a new data access service was database connection exhaustion. During spikes in traffic, the web servers would all attempt to grab connections to the databases at the same time. There was no centralized mechanism to throttle or limit database connections leading to connection exhaustion.

A distributed data access service was our solution. This service contains all of the logic that previously resided in the data access layer in the web servers. The data access service first and foremost protects our most critical resources. It also provides a uniform, language-independent way to interact with relational data. Having the service made it much simpler to add additional clients written in different languages, which we did several times throughout the years. The service also encourages good client behavior by restricting the set of possible queries through a limited API. Finally, the service simplifies the operational management of our relational data resources, which was key in enabling the team to rapidly build sophisticated strategies when needed.

Improving Utilization of Replicas

As the demands on the data access layer started to grow, we saw the load on the primary databases increase at an alarming rate. We knew that a single primary could not handle the load for very long. But, while the primaries were holding on for dear life, the replicas were living a life of leisure, hardly doing anything. Because we used asynchronous replication between our primaries and replicas, we couldn’t rely on reading fresh data from the replicas. Therefore, the only requests we sent to the replicas were those that could accept stale or inconsistent data. Given how important consistent data is to our application, we didn’t have a lot of traffic that fit the bill.

Our goal was to read from replicas while still maintaining strict consistency. The natural first step in any algorithm for doing this would start by checking if the replica is caught up. If it is, then it’s safe to read from the replica. If it’s not, then we need to figure out what to do instead.

One option is to fail the request. This would work most of the time if a replica is rarely lagged, but during periods of sustained lag, this strategy would lead to failing most, if not all, of the replica-bound requests. That wasn’t viable for our traffic.

A second option is to fall back to the primary. This sounds like a great compromise, the best of both worlds. We’d get increased usage from our replicas for the majority of the time, but we wouldn’t have to resort to failing the requests when lag is high. It is, however, a deceptively bad idea. In times of sustained lag, there is a good chance it’s caused by the primary getting inundated with writes and already struggling to keep up with those. In that moment, if it is also flooded with all the requests that were supposed to go to the replica, there’s a good chance that the primary will end up degraded.

Both of the options mentioned above only work well if the replica is already caught up when first checked, which is, of course, not guaranteed. If we can’t rely on that fact, then the only other option is to wait for the replica to catch up and keep retrying until it is. This seemed promising, as we were able to identify quite a bit of traffic for which it was acceptable to occasionally sacrifice a bit of latency for consistent data. The drawback with this strategy is that there is no guarantee that the replica will catch up to the primary in a reasonable amount of time, eventually leading to requests timing out. However, it’s not actually necessary that the replica catch all the way up to the primary, which is the key insight to this next strategy.

The reason we wait for the replica to catch up to the primary is to achieve read-after-write consistency. Read-after-write consistency means that a client performing a write will observe that write on all subsequent reads. The diagram below represents writes as they happen on the primary and when they are replicated on the replica. This replica is slightly lagged behind the primary by a couple writes.

Let’s say that a client performs a write at time t0 represented by the blue box. If a client tries to perform a read on the replica at time t1 or t2, the client will not observe their write. At time t3 however, the client will observe their write on the replica, making it the earliest time at which read-after-write consistency is achieved. However, note that the replica is still not caught up to the primary at that point. If we continue to use replication lag as our signal for read-after-write consistency, then we’ll have missed an opportunity to safely read from the replica.

At Box, we came up with an algorithm that takes advantage of the insight that a client only needs to wait until a replica observes their write, not all writes. Read more about it here.

By implementing this strategy, we were able to offload 50% of primary reads onto replicas.

Summary: The strategy I just outlined exploits the following two key properties of our traffic.

  • We have many reads that require read-after-write consistency but can tolerate some latency
  • Many of our reads occur minutes or hours after the relevant record was mutated. These reads don’t even have to give up speed for consistency.

Improving Cache Utilization

One unfortunate side effect of moving traffic to replicas was that the number of cached objects shrunk, leading to increased cache misses. We use look-aside caches which are populated upon a cache miss during a read and invalidated during a write. Therefore, whether a request is able to read from a primary or replica directly impacts what value is cached. We maintain the invariant in our caching tier that only fresh values will be cached. Given this invariant, caching what’s read from a primary database is always safe. Caching what’s read from a replica can be safe, but determining that is pretty complicated, so we opted not to do that. This is why when we moved more queries to replicas, we ended up caching less content.

To counteract that decrease, we employed a few strategies. First, we cached values that were read from caught-up replicas. In the strategy mentioned in the previous section, when reading from replicas, we check to see if the replica is caught up to the client’s desired position in order to satisfy the read. While doing that, we also check if the replica is fully caught up to the primary. If that is also true, then we’re able to cache the value that we read. Even though we’ve spent a lot of effort to determine what is done when a replica isn’t caught up, the fact of the matter, at least for us, is that a replica is often caught up. Even though none of our algorithms can rely on a replica being caught up, we can still take advantage of that fact in order to cache what we read.

This next strategy is my favorite due to its simplicity: we cache the fact that an object doesn’t exist. Usually when you think about cache, it contains a value that was read from a datastore and expected to be asked for multiple times. You don’t usually think of it as also caching the answer to the question, “does this object exist?” but that’s what we did. When a read comes in for an object that doesn’t exist in the database, we place a sentinel value in cache for that key meaning that it doesn’t exist. After employing this strategy, 10% of our cache hits today are to non-existent values. That’s a nontrivial number of queries that we’re able to remove from the database!

Finally, we make use of leases, a concept originally described by this 2013 Facebook paper. It’s most useful for records that have high read and write activity within a short period of time, which can result in a thundering herd of requests on the database. A lease is a temporary value that is placed in cache that acts like a lock, allowing only the leaseholder to read the record from the database and replace the lease with the record. The remaining requests that do not own the lease must wait until the lease is replaced with the record by the lease holder. We implemented the algorithm described in the paper and found it to be incredibly valuable in protecting our databases from thundering herds.

One side-effect of leases is that they can occasionally result in a bad client experience. Like a regular cache value, a lease is invalidated whenever the associated record has been modified. Upon noticing the invalidation, the requests that were waiting on the lease to be fulfilled try to become the new leaseholder. One of the requests will succeed and the rest will once again wait. The original leaseholder will not be able to set the record it read from the database back into the cache as it does not hold the current lease. Now imagine that this record is being modified frequently enough that every new lease on it is invalidated before the leaseholder can fulfill it. In that case, the only way a request will be able to read the record is by becoming the leaseholder. If there are many readers, a request can wait so long to become a leaseholder that it eventually times out.

We have extended the leasing algorithm to fix this problem by employing some of the same read-after-write consistency tricks that we did with replicas. Read about this algorithm in detail.

Summary: You should look for the following traffic patterns if you want to employ any of the caching strategies above.

  • Many reads that require read-after-write consistency are repeated.
  • Records that don’t exist are repeatedly queried for.
  • Records undergo heavy read and write activity within a short period of time.

Rate Limiting at the Data Access Layer

As the amount of traffic at the data access layer started to grow, we began to see occasional spikes in traffic from some users that resulted in database degradation. The spikes were sporadic, sudden and usually pretty short-lived but left a spike of errors in their wake. At the time, we had methods of throttling heavy users within Box but those methods were at the application layer. Any rate limiting above the data access layer could only guess at what requests might cause harm to the databases. Therefore, it was important that we do rate limiting at the data access layer based on how much of the database resources each user’s requests took up.

Our data access service is distributed and therefore, there is no single actor that has a complete picture of usage. Often, one’s first instinct when trying to throttle in such an environment is to add a new resource that acts as a single point where information can be aggregated and used to make a decision. That doesn’t work too well when the rate limiting is for a low-latency, high-throughput service. The additional round trips to the new resource add significant latency. More importantly, though, the new resource adds a single point of failure. During a spike of requests, which is when we most need the rate limiter, the shared resource is likely to become overloaded and fail.

We decided instead to find an approach that would satisfy the following requirements:

  • Decentralized information propagation: we did not want any shared storage holding all the information and instead preferred that the service hosts hold it.
  • Fault tolerant: we often have hosts entering or leaving the cluster. Our rate limiting framework should be resilient to such changes and recover on its own without manual intervention.
  • Fast convergence: because the information is decentralized and needs to propagate across the cluster, it’s important that information converge quickly following a spike in traffic. Specifically, the information needs to converge fast enough that all hosts can start rate limiting before the spike in traffic can result in degradation.

What we ended up using was largely based on this-highly cited paper that describes how to compute sums across a distributed cluster. Each node in the cluster starts with its own local value and the goal is to reach a state such that every node has the sum of all nodes’ values. The paper proposes an algorithm called Push-Sum Protocol to do that. The protocol is made up of many diffusion rounds.

In each round:

  • Each node chooses a random node in the cluster and sends its local information.
  • Each receiving node use a formula described in the paper to combine the new info with its local info in order to arrive at a new local usage.
  • Each node then chooses a random node to which it sends its new local information.

After a certain number of rounds that’s based on the size of the cluster, the paper shows that there is a high probability that all nodes will have the same and correct sum.

The Push-Sum Protocol as defined in the paper only works for a single moment in time. Traffic, on the other hand, is continuous and usage needs to be calculated on an ongoing basis. To accommodate our requirements, we designed a streaming variation of the protocol. In our variation, time is broken up into intervals. The nodes are accumulating information about multiple intervals at the same time. Push-Sum is used to aggregate usage within a time interval. Once a time interval has been gossiped about for a target number of rounds, we conclude that consensus has been reached and that information can be used to make rate-limiting decisions. With this protocol, we are able to reach consensus within 4 seconds with 40 diffusion rounds.

Summary: the property that makes this particular rate limiting algorithm work for us is receiving occasional bursts of traffic originating from the same entity (ie. a user or enterprise).

Quality of Service for Writes

Replicas, cache and even rate limiting for the most part were all designed to scale reads. It makes sense that those were the priority for a significant part of our history since the vast majority of our database traffic is reads. But, it finally came time to focus on writes.

Over the past year, we started receiving more and more asynchronous writes. Some of these were a result of a customer action, but many were kicked off internally (e.g. data migrations). These asynchronous writes were larger in volume and resulted in significant replication lag, putting our high availability at risk and rendering our replica read strategies less effective. There was no mechanism by which to dynamically put backpressure on the asynchronous writes to slow them down, so we decided to build that.

We started by asking our clients to tag their writes with the quality of service that they required. High QoS for those in the customer path and low QoS for the rest. High QoS requests would always be allowed to execute while low QoS requests would only execute if none of the replicas were significantly lagged. This framework not only prevented lower priority requests from causing any degradation, it also provided a feedback loop to the client who was sending the requests, allowing them to dynamically regulate their rate rather than having to guess at a safe request rate. The other big win was getting better utilization of the primary database by shifting more requests to off-peak times, allowing us to flatten the peaks of database traffic.

After rolling it out, we realized one important problem: we had bucketed all asynchronous traffic together but actually there were different levels of priority even amongst the asynchronous traffic. There was:

  1. traffic that was triggered due to a customer action that could be delayed and retried, but it was important that it eventually complete
  2. traffic that was kicked off internally which was of lower priority and could be abandoned if necessary

We decided to add a level in-between: medium QoS for customer-triggered asynchronous traffic. This ensured that low QoS traffic couldn’t execute while medium QoS was being denied. It also allowed us to completely drop low QoS traffic in times of severe degradation. We’ve moved 20% of our writes to low or medium QoS.

Summary: The traffic property we took advantage of here is that many of our writes could be delayed or dropped in times of degradation.

There you go: an abridged story of how Box’s data access infrastructure changed and grew over the last decade. We, as a data access infra team, know much more about the breakdown of our traffic compared to when we first started on this journey. If there’s one lesson that I’ve learned thinking through all the strategies that we use, it’s that it’s really important to understand the properties that your traffic possesses. In fact, it’s key to building the optimal data tier for your company.

Illustrated by Jeremy Nguyen / Art directed by Sarah Kislak

I hope that this post has inspired you to take a deeper look at your own traffic and that it’s given you some ideas for how to optimize your data tier.

If you’re interested in learning more or working with us, check out our open roles.

--

--