Database Migration at Scale

Roee Zilkha
CallApp
Published in
13 min readOct 9, 2018

In this post, I am going to present an overview of how we managed to properly upgrade our 5-year-old database (based on Apache Cassandra 0.7–1.2.X) to the latest version of Apache Cassandra (currently 3.11) within the shortest amount of time possible and without any significant effect on production during the process. Moreover, I will provide you with general guidelines, which are relevant not only for Apache Cassandra but also to any data migration.

First, when we talk about data migration, whether the goal is to switch to a different database, upgrade the current one to a new version, or simply to change our data models, we ought to understand what exactly the scale of our problem is.

There are 3 general solutions, and each depends on the duration of the process, the level of consistency and availability that you guarantee to your customers/users.

Downtime Migration

If the database contains quite a small data-set, then perhaps it’s possible to start and end the procedure within a declared maintenance window. By doing so, you guarantee consistency as the data doesn’t change by your servers. However, it’s not a valid solution if it violates the company’s SLA (Service-level agreement) in terms of the product’s availability. For example, At CallApp, we very rarely declare a downtime window.

Migration first, repair later

What if the data doesn’t mutate much during the time of migration?
In a case like this, you may keep a transaction log of all actions performed on the database during the time of migration, and once done, sync the conflicting data. The steps for this method are:

  1. Deploy and start the server with transaction log turned on, new db reads turned off
  2. Start migration script
  3. Turn off server — downtime
  4. Run repair — flush transaction log into the new db/model.
  5. Turn the server back on with new reads on, transaction log off.

Dual Writes Migration

When our data frequently mutates and is also too big for downtime migration.
At CallApp, we handle both cases. In doing so, there is a need to be creative and more than anything, responsible for what we’re planning to do — as we cannot tolerate any loss of data.

The main difference from the previous method is that here you will have to keep the data consistent, while the migration process is still running by activating two (2) write methods, one for the old version and one for the new version. Once the migration is done, turn on new reads version. On this method, we will extrapolate here.

CallApp & Cassandra

For the last 5 years at CallApp, we have been using Apache Cassandra 0.7–1.2.X to store most of our data. It has proven to be the right choice for our use case and goals up until recently. I will not elaborate why, as it would require an entirely different post. The benefits of Cassandra as a highly scalable write-intensive database qualified itself in the past as the right technology for CallApp, but unfortunately no longer works for our needs.

A lot has changed since we span up our first Cassandra cluster. Technologically, we were way behind where we wanted to be, and we knew that the cluster was not capable of serving our future goals in terms of both performance and functional capabilities. We had to switch to another technology or move our entire data to a Cassandra 3.11 cluster. After long discussions and consulting with other professionals, we decided on the latter.

Wait… doesn’t Cassandra have its own migration mechanism?

Well, yes, of course, there is the sstableloader command which upgrades sstables files to the next version. However, in our case we were planning to move several versions forward and not only one, so the whole process had to be in the following order:

Upgrading to 1.2.19 -> Upgrading to 2.0.17(Still supports Astyanax*) -> Upgrading to 2.1.15(Migration to Datastax driver-> Upgrading to 2.2.7 -> Upgrading to 3.11

Well, you see how complex and prone to errors that would be, plus it would take a lot more time than using an in-house migration system.

Server Modifications

Before going over the main process which is completely offline, first, we need to discuss the changes required on the server’s end. As I mentioned in the previous sections, the data must stay consistent for every C.U.D (create, update, delete) query we invoke on the old cluster. We must run the same query on the new cluster.

However, it does not end with concurrent writes to two DB clusters. The whole process should be gradual, containing feature switch for each step and sample logs for data integrity verification. Moreover, we should also ask ourselves how to avoid a significant increase in latency. Let’s start with explaining each step/phase:

  1. Writes off/Reads from old — deploy the new Server version, with all the necessary code changes that you have made. Read from the old cluster as you used to and don’t start writing to the new one just yet. Since writing/reading to a new database may require some rewriting of existing logic, I strongly recommend you wait several days before turning on the new writes. This will enable the team to be absolutely sure that there is no regression which may affect existing functionality.
  2. Async Writes On / Reads Old — asynchronously write to the new cluster and keep reading from the old cluster. During this phase, write samples of modified entities’ ids to a log file.
  3. Start migration (we will elaborate on the whole process later on)
  4. Wait for migration to end — while waiting, use the sample logs to verify that the entities saved in the old cluster are equal to their representation in the new one.
  5. Migration ends
  6. Verifying…
  7. Sync Writes on / Async old cluster writes on / Reads new — read from the new DB and replace every synchronously write to the old cluster, to asynchronous. Furthermore, every synchronous write we used to invoke on the old cluster, should now be activated synchronously on the new cluster.
  8. Stop old writes and shut down old cluster — final stage. After days of stability and data tests, stop writing to the old cluster and shut it down for good.

9. Pour some good wine for the whole team.

Possible Consistency Issue

What happens if we send 2 updates on the same entity asynchronously?
The first may run after the second and override its changes. At CallApp, we resolved it by sending the timestamp of the query creation, (i.e. UPDATE …. WHERE KEY =? USING TIMESTAMP?). Cassandra keeps a timestamp for every column write, and if the query timestamp is smaller than the one saved in the DB, it will not override the existing value. In other databases, which do not support such a feature, you may:

  1. Make all new writes synchronous and add more servers to reduce the impact they have on the overall latency.
  2. For a group of queries which are dependent on one another, create a wrapping thread that will run them by their given order.

Deletes — playing it safe

Remember, your system still relies on the old cluster until you turn on the switch when the migration process is over. This means that if there are errors in deletes, it will not be as obvious as errors on the old cluster. Furthermore, if you delete a record which you shouldn’t have, later on, you would have to scan the entire data on the old cluster, fetch the records that you have lost, and save them.

At CallApp, we decided to perform logical deletes. For every table, we added a Boolean “mark_delete” column and for every row which should be deleted, we “marked” it as “deleted” using that column, instead of erasing the entire record from the table. After Migration was done, we verified against the old cluster that those records were supposed to be deleted, removed them and turned on physical deletes on the new cluster.

Migration Process

Requirements:

  1. Duration- the longer migration takes, the higher the risks for inconsistencies and errors.
  2. Data Consistency — as stated before, the new database has to be consistent with any changes of data during the time of migration. Once we start reading from it, we will receive the same results we would have received if we were reading from the old database.
  3. Production Stability — the system must still function properly while migration is running, we cannot allow it to significantly affect latency and throughput.
  4. Cost Efficient
  5. Fault tolerant — if the migration process for whatever reason it may be, shuts down, will you be able to restart it without going all the way back to the starting point? Imagine going to your boss after weeks of migration to tell him/her that the company has to postpone every project which relied on the new database because you have to start all over again.
  6. Easily Monitored — you would want to check progress repeatedly, so monitoring should be simple and not too time-consuming.
  7. Simple Rollback — going back to the old system should be simple and not time-consuming, taking a step back should not involve significant downtime.

Solution A — migration scripts

One way that comes to mind is to simply create a migration script that opens connections to the old cluster, requests data from it and pushes the records into the new cluster. Unfortunately, the drawback outweighs the obvious advantage of a simple implementation. This method opens connections, sends TCP requests and fetches data from the old cluster, thereby increasing traffic congestion which may result in a higher request latency and lower throughput, in a production environment.

Surely, you can scale out and add more instances to the old cluster. However, in many cases, for old systems, adding more nodes is not quite simple and also, don’t forget the cost. You may argue that the developers’ time also costs a fortune. Well, spin up new instances on 7-year-old technology, get your DevOps team involved, wait for repair failovers, delay projects that rely on the new database, pay for the extra machines needed and once all this is done, you will realize that you will be paying much more than what I’m about to recommend.

Migrate from raw files

In the case of Cassandra, raw files are SStables. We don’t need a connection pool, TCP connections or any overhead of fetching data from an existing server. All we need are snapshots of the data directory from each node and the ability to think outside of the box.

Let’s go back to some of the requirements I mentioned earlier:

  1. Low duration — you are extracting the data directly from files instead of requesting it from an already congested server, thereby reducing the read phase significantly. Our tests revealed over 10X better performance using this method over requesting the data from the cluster.
  2. Production Stability — we are not sending any request to the old cluster; therefore, migration has no effect on live production environment’s throughput.
  3. Cost efficient — EBS storage and several general-purpose machines to run the migration results in minor costs compared to the previous solution (migration machines + extra storage optimized machines for the old cluster). This is to avoid a significantly higher latency on production, due to requests coming from the migration process.

These are significant advantages over the conservative method. Nevertheless, it demands a deep understanding of the technology you are working with. A significant amount of research is required first, starting with handling consistency issues between SStables.

Reading SStables rows directly from file system

We start by creating an AbstractReader, which will operate as an iterator over the sstable file. Since counters and regular rows should be approached differently, they will each have their own readNext.

Recovery Token:

Explained in a later section, we keep the state of the current run, in order to recover from possible crashes.

Counter reader Iterator

Counter read response object:

Counter read response object:

Standard table reader Iterator

Response objects:

Compaction:

If the database is based on immutable files and commit log, it is recommended to invoke compaction to reduce the number of files and their content.

Possible inconsistencies for counters updated during compaction + snapshot creation

What if a user adds a counter before the snapshot is generated and after the new writes are turned on production? You would have the following scenario:

  1. insert (c,1)
  2. snapshot create, holding (c,1)
  3. migrate engine will add c=c+1 => (c,2)

Solution?

Well, the right way to resolve it is to keep those specific counters in the log file, once we have all the snapshots generated, stop writing to that log. Once we are done with the migration, we can handle them separately by using the more conservative method of fetching the right values from the old database.

Dealing with mismatches between sstables (Cassandra)

At CallApp, as in any reliable database setting, we duplicate the data across multiple machines. As a result, a record exists in at least “n” files and since some nodes may not be synced with the latest data, there will be conflicts. Moreover, SStable files of the same node might contain the same keys with different data.

A simple way to solve it is to also extract the timestamp and use its field in the query (i.e. UPDATE … WHERE KEY =? USING TIMESTAMP?), Cassandra will not update fields which were updated with a larger timestamp.

Migrating Counter tables

The previous solution is not valid when it comes to counter tables, as these tables do not support the timestamp feature. Therefore, you will need to use “2 phase commits” by using an assisting table that will keep the keys of already existing counters and mark counters as “written”, as depicted in the following diagram.

What happens if the process crashes before the final stage?

Because it’s a counter table, you will have to clear those counters before running. This shouldn’t happen often, yet it is still recommended to be ready for such a scenario, even though it adds another overhead of “committing” the ids twice.

Inserting data into the new Database

This is actually pretty straightforward. Gather the information returned from the SStableReader iterator, generate queries, divide into groups to execute concurrently and push into the database.

Monitoring and Recovery

When migrating large-scale data, you have to consider the possibility of various malfunctions along the way, which will eventually cause you to restart the process. However, if you have a recovery flow planned not far from where it stopped earlier, it shouldn’t be a major concern.

Monitoring is part of recovery because an efficient monitoring system will help you understand whether or not there are critical issues which will require recovery. Moreover, it should be simple to evaluate the status of the entire process without digging deep into logs of every instance, otherwise, you would be wasting an hour of every day looking at logs.

At CallApp, we use a reporting table in Cassandra, which we update every 5 minutes using a scheduler thread. This way, we do not cause an overhead on the cluster and still have a persistent reporting.

Start with creating a migration_status table and an equivalent data structure, containing the following fields:

  1. Table_name
  2. Node — the node that contained the sstables file
  3. Inserts — how many records were pushed from sstable file
  4. Scanned — the number of records scanned from sstable
  5. Last_int_token & last_key — offsets of the last record to be committed
  6. File_index — which file was last opened (also used for offset)

The main process should constantly update MigrateStatus object with current status, and the scheduler persists it into the database every 5 minutes, as illustrated below.

With regards to recovery, since we keep an offset for each run, it’s simple to start the migration from where it previously stopped without running the entire flow all over again.

Sampling

It’s imperative to constantly verify that the new database is synced with the old setting. In doing so, it is always preferable to have the verification run automatically so as not to let other tasks interfere with the R&D productivity. One way to do so is to keep a sampling class which would write each migrated key to a live input stream according to specific probability. On the other end, an independent process would collect these ids and make sure the new database matches the data inside the old database.

High-Level Design

We may publish the entire code on GitHub as an open source for the community later on in the future. However, it’s always beneficial to see all the parts put together in a diagram.

We can divide this diagram into 3 different parts, of which each on its own is a sub-component:

  1. SStable Read
  2. Sampling
  3. Inserting

Summary

Unfortunately for business reasons, I cannot reveal how long the whole process took. However, I can say that it took us much less time than it would have if we had taken the conventional route (opening connections to the old database).

Nowadays, we are up to date with the right database technology that will help us push further to achieve new goals and overcome challenges.

When the word migration pops into mind, you always think “small script and I’m done” … It’s never the case on a large scale, as nothing can be described as “simple” when it comes to such applications. You need to be sure that the process is correct, fault tolerant and trust the team in charge of it.

If you or your company are on the way to upgrade your database, get ready to sweat! Needless to say, once done, it will be a great milestone for you and the company, especially when you are the data guy ;-)

Roee is a Server Specialist at CallApp Software. Over the past 6.5 years as a data guy, he’s worked on several high-profile projects. When he’s not daydreaming about data, he enjoys double shots of espresso and reading about history. One of his passions is saving the world one terabyte at a time.

--

--