Serving configuration data at scale with high availability

Pavan Chitumalla and Jiacheng Hong | Pinterest engineers, Infrastructure

We have a lot of important and common data that’s not modified frequently but accessed at a very high rate. One example is our spam domain blacklist. Since we don’t want to show Pinners spammy Pins, our app/API server needs to check a Pin’s domain against this domain blacklist when rendering the Pin. This is just one example, but there are hundreds of thousands of Pin requests every second, which generates enormous demand for access to this list.

Existing Problem

Previously, we stored this kind of list in a Redis sorted set which provided us with easy access to keep the list structure in a time sorted order. We also have a local in-memory and file-based cache that’s kept in sync via polling the Redis host for any updates. Things went well in the beginning, but as the number of servers and size of the list grew, we began to see a network saturation problem. In the five minutes after the list was updated, all the servers tried to download the latest copy of data from a single Redis master causing the network to saturate on Redis master and resulting in a lot of Redis connection errors.

We identified a few potential solutions:

  1. Spread the download of this data over a longer period. But for our use case, we wanted the updates to converge within a few minutes at most.
  2. Shard the data. Unfortunately, since this data is a single list, sharding it would add more complexity.
  3. Replicate this data. Use a single Redis master with multiple Redis slaves to store this data and randomly pick a slave for reads. However, we weren’t confident about Redis replication (we were running v2.6). Moreover, it wouldn’t be cost effective since most of the time (when the data is not updated) these Redis boxes will be idle due to client side caching.

Solution

As each of the above solutions has its own shortcoming, we asked ourselves, how would we design a solution if we were building from ground up?

Formalizing the requirements of the problem:

  • Frequent read access (>100k/sec) and rare updates (several times a day, at most).
  • Quick (within one minute, or several at most) converge the updates across all boxes. Ideally, a push-based model instead of clients polling for updates.

We engineered a solution by combining the solutions to the smaller problems:

  • Cache the data in-memory so that high read access won’t be a problem.
  • Use Apache ZooKeeper as a notifier when updates are made.

This is conceptually similar to the design of ZooKeeper resiliency, but if we stored the entire data in one ZooKeeper node, it would still cause a huge spike in network traffic on the ZooKeeper nodes during an update. Since ZooKeeper is distributed, the load would be spread across multiple ZooKeeper nodes. Yet, we didn’t want to burden ZooKeeper unnecessarily as it’s a critical piece of our infrastructure.

We finally arrived at a solution where we use ZooKeeper as the notifier and S3 for the storage. Since S3 provides very high availability and throughput, it seemed to be a good fit for our use case in absorbing the sudden load spikes. We call this solution managed list aka config v2.

Config v2 at work

Config v2 takes full advantage of the work we have already done, except that the source of truth is in S3. Further, we added logic to avoid concurrent updates and to deal with S3 eventual consistency. We store a version number (that’s actually a timestamp) in ZooKeeper node which also will be a suffix of the S3 path to identify the current data.

If a managed list’s data needs to be modified, a developer has the option to change it via an admin web UI or a console app. The following steps are executed by the Updater app on save:

  • First, grab a Zk lock to prevent concurrent write to the same managed list.
  • Then, compare the old data with the one in S3 and only upload the new data to S3 if it matches — Compare And Swap update. This prevents dirty writes while a previous update is converging.
  • Finally, write the version to Zk node and release the Zk lock.

As soon as the Zk node’s value is updated, ZooKeeper notifies all its watchers. In this case, triggering the Daemon processes on all servers to download the data from S3.

How we grappled with S3’s consistency model

Amazon’s S3 gives great availability and durability guarantees even under heavy load, but it’s eventually consistent. What we needed was “read after write” consistency. Fortunately, it does give “read after create” consistency in some regions*. Instead of updating the same S3 file, we create a new file for every write. And yet, this introduces a new problem of synchronizing the new S3 filename across all the nodes. We solved this problem by using ZooKeeper to keep the filename in sync across all the nodes.

Introducing Decider

When a new feature or service is ready for launch, we gradually ramp up traffic in the new code path and check to make sure everything is good before going all in. This resulted in the need to build a switch that can allow a developer to decide how much traffic should be sent to the new feature. Also, this traffic ramp-up tool (aka “Decider”) should be flexible enough so that developers can add new experiments and change the values of existing experiments without requiring a re-deploy to the entire fleet. In addition, any changes should converge quickly and reliably across the fleet.

Earlier Solution

Every experiment is a ZooKeeper node and has a value [0–100] that can be controlled from the web UI. When the value is changed from the web UI, it’s updated in the corresponding node, and ZooKeeper takes care of updating all the watchers. While this solution worked, it was plagued with the same scaling issues we previously experienced since the entire fleet was directly connecting to ZooKeeper.

Our Decider framework consisted of two components: a web-based admin UI to control the experiments and a library (both in Python and Java) that can be plugged in where branch control is needed.

Current Solution

Once we realized the gains of managed list, we built managed hashmap and migrated values of all Zk nodes containing the experiments. Essentially, the underlying managed hashmap file content is a json dump of the hash table that contains experiment names as the keys and an integer [0–100] as the value.

API

def decide_expermiment(experiment_name):
return random.randrange(0, 100, 1)

How this is used in code:

if decide_experiment("my_rocking_experiment"):
// new code
else
// existing code

Another use case of Decider: dark read and dark write

We use the terminology “dark read” and “dark write” when we duplicate the production read or write request and send it to a new service. We call it dark because the response from the new service doesn’t impact the original code path whether it’s a success or failure. If asynchronous behavior is needed then we wrap the the new code path in gevent.spawn().

Here’s a code snippet for dark read:

try:

if decider.decide_experiment("dark_read_for_new_service"):
new_service.foo()
except Exception as e:
log.info("new_service.foo exception: %s" % e)

*In the rare event that S3 returns “file not found” due to eventually consistency, the daemon is designed to refresh all the content every 30 mins, and those nodes will eventually catch up. So far, we haven’t seen any instances where the nodes got out of sync for more than a few minutes.

If you’re interested in working on engineering challenges like this, join our team!

Pavan Chitumalla and Jiacheng Hong are software engineers on the Infrastructure team.

For Pinterest engineering news and updates, follow our engineering Pinterest, Facebook and Twitter. Interested in joining the team? Check out our Careers site.