Bucketisation: Using cassandra for time series data scans.
Coming from RDBMS world it takes some getting used to before you can efficiently start modelling entities in Cassandra. The rule of thumb that helped me out is let your queries define the entities you need.
I recently carried out a migration of persistence layer for my application from Oracle to Cassandra. While migrating some of the primary entities was fairly straight forward, it was solving uses cases like supporting range scans that presented us with its own set of unique challenges.
Note: Range Scans/Queries are basically record lookups based on timestamp ranges, with or without any filters on top of it.
Let’s understand few things about the application first before jumping into actual problem, the application I am referring here handles all communications for all the transaction customers perform on our sites. To put things in perspective on an average day we send tens of millions of communications to customers, spanning across various channels like E-Mails, SMS and App notifications.
We wanted to ensure that we support range queries on our entities upon moving to Cassandra, for the following two reasons:
- Our team was already using similar reads in few dashboards while we were on Oracle, so had to ensure backward compatibility.
For e.g. queries like how many communication were send out for a particular event on date ‘X’ between times ‘Y’ and ‘Z’.
- Also, it helps you to stay future proof to any potential migration we may have to carry out in future to any next generation persistence solution.
For the scope of this write up lets assume we have a column family defined like:
Now this helps you query details of message based on message id since the above entity is partitioned upon message id.
But problem arises when you want to lookup messages landed within a specific time span. And for supporting that we will introduce a column family like below:
Now this allows us to execute queries like below to get data across a time range:
While this may hold for systems intended to serve consistent and predictable traffic, but for systems which are prone to unexpected surges driven by customer behaviour, a model like this can be an absolute nightmare.
Since the above model is partitioned on created_date column, any unexpected surge in traffic on a given date would mean all of your writes getting directed to same partition and in all likelihood creating a hotspot on your Cassandra cluster. This would essentially mean three things in terms of cluster performance:
- You are potentially risking certain nodes in your cluster.
- You may never be able to successfully read this data, since the looks ups are at risk of timing out.
- You are breaching one of the golden rule which helps to keep your cluster in a healthy state, i.e. don’t let your partition size grow too big. (It’s recommended to keep partition size around 100 MB, but there is no exact number.)
So what did we do? we tried introducing a new factor called window of the day, which is nothing but a time slot (uniform) which divides a day into equal slots and spreads out traffic across multiple partitions, some thing like below:
Now in the event of a traffic surge on a given day we would still be able to hit multiple partitions while writing data and prevent any potential hotspots.
Queries in this case to read entire data for a day would look like (assuming we have decided to have 6 hour slots)
While this approach does solve the problem, but it solves it partially. The model is still prone to same problem we described with the first model if the traffic surge is restricted to a particular slot.
You may decide to reduce your slot size to less number of hours or even minutes for that matter, but fact of the matter is your model is still susceptible.
In a hopeless attempt of trying to fit this approach to our production use case we assessed that it would need a slot size of around 5 mins to ensure that even during our max peak (current) we are able to prevent any hotspots and are able to direct traffic to next partition as soon as possible while keeping current partition size with acceptable limits. This essentially would mean we would have to look up across 12 * 24 = 288 partitions to fetch data for just one day.
So what do we do?
We decided to take a step back and reassess our plan, instead of brute forcing our way to tick all data Cassandra modelling guidelines. We concluded that we need a reliable way to send controlled writes to each partition and as soon as we reach limit (max storage we want to maintain per partition) our app should be able to redirect writes to a new partition.
We came up with a bucketing strategy where each app instance (VMs/computes) maintains its own in-memory counter for the records it persisted and has a dedicated bucket_id assigned to all those writes. The app generates a new random unique id each time it runs into flowing conditions:
- Date has changed.
- A new entity has been introduced.
- The bucket has reached its max count. ( Meaning enough records has been persisted in this particular partition and the writes needs to be moved to a new one.)
This also required us to maintain a dictionary to maintain a mapping of all bucket_ids generated by all the app instances to dates and entities they are created for. We can have this persisted in the same Cassandra keyspace.
In this case rows in our entity (message_log_dated) to support our range scans will look like this
While the dictionary to maintain mapping between date and entity to their respective buckets for that range will look like.
Now when your bucket overflows within a particular day, you may see an entry like below (notice the 7th row created for the same VM within the same date.):
Now let’s look at how we can use this model to perform a range scan on a given entity, with some actual test data.
Here’s a quick query confirming that each bucket adheres to the max bucket count assigned, kept at 50000 for my test.
Here’s a quick illustration of writes and reads for this flow:
Now lets evaluate this approach,
- Filtering should be straight forward, you can have more columns replicated in dated logs so that you read message ids for just the config you want to.
- Inserts are easy and fast.
- Reads are fast.
- Most importantly, its solves the cluster balancing problem we are trying to address here.
- It helps if the load balancer on top of your apps is an unbiased one, but even if it sways in event of a bug/config change, this approach will still hold solid.
- Sorting (if needed) would need to be carried out on the app layer.
- Pagination would need to be carried out on the app layer.
Range scan on Cassandra aren’t trivial to solve but with the right data model in place you can definitely work it out. It’s imperative your writes are distributed and uniform to keep your cluster up and healthy.
While you can argue using the proposed solution above makes you consume more storage space, but that’s totally ok in Cassandra world. You can replicate data (and not be in most normalised form) if it helps you optimise your queries.
Using this ‘Bucketisation’ based approach you can support range scans while being shielded from unexpected traffic surges and without creating any risk to your cluster’s health.
Last but not the least, a big shout out to my mentor for guiding me right from the inception of this approach till it was functioning in production.