Schema migrations with Cassandra
We decided to serve emails on top of Cassandra in an effort to be highly available.
But, as you might guess, getting things right from the first time is something hard to do. We ended up having to rethink our data-model schema. And as we already had data in production, this also means migrating these data in order to benefit from the new data model.
This post will cover:
— Why a new data-model is needed?
— What is the structure of this new data-model?
— How to move to this new data-model?
— What are the results of this process, and difficulties encountered?
We so far support a few hundred users on top of the James server current internal deployment. However, a quick look at our metrics showed some IMAP operations were lasting up to several minutes and were generating large spikes on IO operations and on load.
These IMAP requests were so resource-consuming they even delayed all the other requests, seriously downgrading overall Cassandra performance and ultimately causing time outs on unrelated queries! A nightmare scenario!
Well, what happened?
A correlation between our Kibana analyzed log file and metrics allowed us to identify the guilty IMAP request:
UID FETCH 1:* UID
This request actually returns all the UIDs contained in the selected mailbox. It is used for client resynchronisation and allows to identify deleted emails while being offline.
Here is how our write path was organized:
We separated mutable mailbox context (mailbox and flags) from an immutable message. The mailbox context is denormalized and referenced using a MessageId and a MailboxId + UID. But what you can notice is that we did not separate our message metadata from the big biiiig blob!
However we support FetchType concept, that allows tuning requests sent to Cassandra, and in the case of the above IMAP request we ended up requesting this to Cassandra:
SELECT metadatas FROM message WHERE …
So far so good, right? However, performance was still very poor:
— Blobs presence in the row makes it impossible to enable Cassandra’s Row cache.
— And the metadata we care about are separated by Gigabytes of big blobs we don’t care about, killing file system caching strategies…
— Retrieving these blobs was too long.
— These blobs could exceed the commitlog size (~64 MB) and thus be rejected.
An email exchange with Duy Hai Doan, Cassandra technical advocate at Datastax, pointed out limitation of the storage system:
— Enabling compression, which is the default behaviour, leads to Cassandra performing reads by chunk of 64K. With metedata only, this read size is limited by the actual size of the row. With blobs, you end up also reading a fraction of the blob.
— If your blob column fits in the middle of the row, Cassandra needs it fully to uncompress the metadata located after it.
Imagine that you have a SELECT age,firstname,lastname. In your user table you have also the column “blob”. Since all normal columns are sorted on disk on their label order (e.g. age->blob->firstname->lastname-> ….) to read age,firstname,lastname, Cassandra will have to fetch the blob column too and it is wasterful.
One every UID FETCH UID request Cassandra was reading the entire mailbox content blobs on the disk, generated GigaBytes of reads!
We needed to do something!
So, a better design is needed:
- We need to separate the blobs from metadata to make reading metadata efficient.
- We need to make blob storage efficient.
The resulting design is quite simple:
What you can see is that blobs are stored out of messages’ metadata. We added one more level of indirection so that the chunking choices are not correlated to messages (ie: if I need smaller/bigger chunks, I do not need to modify messages, it is a blob storage property).
Note that we store two blobs for a message. One for its headers. One for its body. When you send a message to several recipients, the headers might get customize, but the body will be kept unchanged. We can easily implement message body deduplication by using as an ID the hash of a blob. Compute is cheap, right?
So, we have one old schema populated with data and the new one are empty. How do we pass smoothly from one to an other?
Well, let’s first sum up what we want:
- We want the migration to be run “online” without downtime.
- We want to avoid race conditions. Our data should always stay consistent.
- We want the migration to be idempotent
- We want the migration process to be generic
- And finally, we want to know which version of the schema we are currently using.
The migration model is simple:
- We create the tables for the new data-model
- We populate them with data
- When safe, we can remove the old tables related to the new data-model and the related James code. A tag in the Code Version System allows finding where the code handling a specific migration is located.
We will start with the basic: “What is a schema version?”
A schema version indicates you which schema your James server is relying on. Abstract right?
The schema version number tracks if a migration is required. For exemple, when the schema maximum shema version is two, and our schema version is one, we know that we might still have data in the deprecated Message table. Hence we need to move these messages in the MessageV2 table. Once done we can safely bump the current shema version to 2. In version 3, the Message table can then be safely removed.
It also ensures James is not started if James is too recent to handle the version stored in Cassandra (ie: following tables have been dropped).
So, we now know how to detect that a migration is needed, and store information about the data-model.
Now we need to move information from the old table to the new.
On the fly migration can be performed. As such:
— Read from new table
— If found return the value
— If not found, read from the old table…
— … and copy it to the new table…
— … then you can safely remove from the old table
Also because you only write to the new table, the old table becomes read + delete only.
Important note: we are migrating immutable data, that thus will not be updated on the old table during the migration process.
We implemented the copy/remove logic in a background thread and protected it with a fixed length queue. However, even if easy to implement, On the fly migration do not allow us as such to:
— Ensure every entry will be migrated. To be moved an entry needs to be read. Who reads 2-year-old emails?
— Because we are on a per-entry logic, we need more information to identify the switch of version: ie when the old table is empty.
Hence we decided to introduce a REST triggered migration task to do the exact same process. The key idea is to directly read entries from the old table. If they are there, we can migrate them, then remove them. Reporting was made through log and state of the migration (migrated message count, errors, etc…) could be followed from Kibana.
Running this migration took a bit of time. We had to move 2.7 million emails. We experienced several issues:
— 0.3% of reading timeouts combined with “no connection available”, which means we were encountering read issues (the reason that triggered the need for migration). Thus the migration process was not reliable and could be interrupted by error. The solution is to automatically restart it.
— Tombstones associated with deletions made new initial old-table reads very slow. It becomes critical when you need to perform the tombstone heavy read every time the main read time outs. We also had to increase timeouts values for specific requests up to several minutes due to this problem, impacting a bit server performances.
You might want to switch gc_grace_seconds of the outdated table to 0:
The rationals behind this choice:
— Tombstone will be directly evicted
— Zombie entries may resurect but our migration process is indempotent so we don’t really care… We can just migrate them again…
Due to these issues, a major compaction that takes several hours to run was required every ~ 400.000 message migrated, every 2 or 3 hours.
— To benefit from message body deduplication, we need identical body to be stored on the same SStable, thus you benefit from deduplication only after a major compaction.
— We encoutered also some issues reading old entries… Some obscure Cid validation logic that was a bit too strict…
Running the migration process took us 3 working days. When running it migrated messages at 200k messages per hour. The effetive migration running time was approximatively 13 hours. The size of compressed data to be migrated was 140 GigaByte, 700 GigaByte if not compressed. (Compression is a default Cassandra feature as SStable are immutable).
Moreover, we encountered the following benefits:
— The UID FETCH UID command presented above now execute in ~500 ms instead of ~3 minutes which represent an x120 improvement
— The latency to retrieve a 300 KiloByte blob has dropped from ~120 ms to ~3 ms.
— The storage thanks to body deduplication has shrunk to ~79 GigaBytes which represents a -44% storage space reduction.
— The message metadata read time is now only ~0.06 ms.
— We can turn on row cache on messages metadata and blob metadata!
We experience a way faster IMAP James server, even with limited hardware. These results are encouraging and will lighten the burden of Cassandra related code performance to the team. Which means that we are soon going to have feature related James articles!
I let the word of the end to this dear Chuck!
Useful nodetool reminder, heavily used during the migration process: