How we migrated over half a billion records without downtime

As Twitch has grown, we’ve encountered many different and interesting scaling problems. One of our biggest challenges has been the 500+ million following relationships between users. Up until a few months ago, these relationships were stored very inefficiently and could no longer scale to our needs. Following relationships are immensely important to our broadcasters and viewers, as they are the means in which we notify users when their favorite broadcasters start streaming. This underpins the communal nature of Twitch; thus, while planning improvements we decided that downtime during any migration was unacceptable. To efficiently solve this task, we built a TAO-look-a-like and incurred absolutely zero down time in the migration. Here’s how.

Establishing the Path Forward

The initial schema was not designed with a billion relationships in mind. It lacked correct indexing and was a part of a large, centralized PostgreSQL cluster — which was succumbing under load during peak times. One of the primary use cases for our following system is to notify users when their favorite broadcasters have come online. This means iterating over all of a channel’s followers as quickly as possible. To illustrate the problem, this is a graph of time required to retrieve all followers sequentially, fetching 100 results at a time. The old model is unsustainable, while the new model is a perfect fit.

At peak, the following-related queries represented 40% of our PostgreSQL cluster’s available CPU time. Having pushed that cluster to its limits already, we sought to move to a more optimal data model. Inspired by Facebook’s TAO, we realized we could simply model our follower data as associations in the form of [Entity A, Association Type, Entity B], as such:

With the target data model established, we created a high level migration plan:

  1. Take a database dump of the PostgreSQL table
  2. Simultaneously start recording all new write events (follows and unfollows) on to a time ordered event queue.
  3. Import the database dump into a new database and convert the data into the new schema format.
  4. Replay all queued up writes onto the new data store until it’s caught up
  5. Divert traffic to the new database

We had a separate program consume the write events and write them to the new database using the new schema. Although on paper that makes us able to divert the traffic and switch over, we wanted to be sure that our newly formatted data matched the semantics of our old data. Clients of our API should not be aware that this change was happening and all previously stored data should be retrievable. Since each of these migration steps has a possibility to introduce non-obvious faults, we devised a strategy to ensure consistent behavior between the two data representations.

Ensuring Success

To ensure data consistency, we took inspiration from Github’s Scientist library for refactoring critical paths. We wrote our service to support more than a single data store: a primary one and an arbitrary number of secondaries. We can mirror, in parallel, each incoming request to all the data stores and return the result from the primary one to the client. We can then, in a different thread, compare the result of a secondary data store with the primary to see whether it’s correct. With this information, we can say with confidence when the migration is ready to proceed. We can also measure request times for each secondary data store to ensure response times are within expectations. The idea is illustrated here, with the steps being numbered in order:

Below is a code snippet that illustrates the flow mentioned. The client supplies ‘from’ and ‘to’ IDs, as well as the type of the association. Our service responds with a single result if the record existed, otherwise nothing. The interfaces look like this (note that I’ve excluded Contexts everywhere, trimmed package names and simplified some things for brevity’s sake):

type Reader interface {
GetAssoc(Association) (*AssocResponse, error)
}
type Backender interface {
GetAssoc(Association, *Params) (AssocResponse, error)
}
// Backend provides storage and stats implementations
type Backend struct {
primaryReader Reader
secondaryReaders []Reader
}

This makes it possible to create multiple readers. Every incoming request comes into the backend, which then routes them to the primary and the secondaries.

func (b *Backend) GetAssoc(assoc Association) ([]AssocResponse, error) {
res, err := b.PrimaryReader.GetAssoc(assoc)
if err == nil {
go func() { b.secondaryGet(assoc, res) }()
}
return res, err
}

Secondary reads are handled after the primary has already returned. We spin up a goroutine for each of the registered secondary readers, and time how long each one takes. That allows us to compare the result it got back with the primary result and report stats around to our statsd cluster. Finally, these all execute within a timeout function for safety.

func (b *Backend) secondaryGet(assoc Association, primaryResult AssocReponse) {
for _, r := range b.secondaryReaders {
go func(reader Reader) {
res, err := reader.GetAssoc(assoc)
if err != nil || res != primaryResult {
// Log failed comparison
} else {
// Log successful comparison
}
}(r)
}
}

Logging the successful and failed comparisons in our statsd clusters allow us to measure mismatch rates:

In the case of a mismatch we logged the result alongside the expectation and compared the two. The dips in the graph above represent our progress from steadily investigating inconsistencies and fixing them. A few example issues that we found were:

  • Result sets not being ordered correctly
  • Records being included when they should not have been
  • Time stamps being returned in the wrong timezone

Conclusion

By the end of it, we knew that the results we were serving to our users were the ones they were expecting and felt confident in switching over to the new database cluster. As an extra safeguard for rolling back, we added the original database as a secondary data store when we switched over — meaning that it wouldn’t get out of date, and it’d be easy to switch back if any problems occurred. It’d just be a matter of changing which database we call primary and which we call secondary. We first did this with reads and subsequently with writes by implementing a secondary writer.

Migrating 500 million rows without downtime was a lot of fun, especially given our confidence about the outcome. We knew our new TAO-like schema would be a significant upgrade over the old schema, and since we moved this information into its own separate data store we took about 30–40% of load away from our main database cluster. And no one noticed!

We're hiring!

This was a fun problem for us to work on. We have a ton of other fun problems left. If you are interested in working on systems and scale, please look at the Twitch Engineering site!