ScyllaDB Migration : The not so drop-in replacement of Cassandra
Challenges in using existing Cassandra setup
Our journey starts back in June, 2023 when we initially started planning for migrating our existing single EC2 node humongous cassandra (4.x) machine (m6a.24xlarge) having 96 vCPUs and 384GB of memory. The idea was this had become a single point of failure and we could not tolerate any downtime or latency spikes due to a single instance causing availability issues and clients complaining of our realtime page not opening. See, cassandra was used for our most critical page — Realtime dashboard where a transporter can see the locations of his fleet along with other meaningful business critical data points. This cassandra table is continuously updated every 10s from our spark pipelines to reflect the most accurate info about a vehicle. So, this page should never go down or even slow ! As simple as that.
We also ingest raw data points from IOT devices into cassandra DB raw_device_data tables which we then use to plot a timeline on maps in our UI. The size of this data had exponentially increased on month to month basis. For giving an approximate idea, we ingest a little more than 50GB raw data on a daily basis and ingest more than 500 million data points per day. Given the scale at which we were growing as well as the limitation of GP3 EBS volumes supporting upto 16TB only, we used to take a maintenance downtime window and manually go and delete an oldest compacted SStable which was at around 10–12 months old. Of course, this was a hack which could not continue for a longer period of time since data size was increasing continuously.
Migration planning
For migration, of course, we had 2 choices. Either move to a 3 node cassandra cluster or move to scyllaDB. When we looked at ScyllaDB, the major advantage we could find was that it was rewritten in C++ compared to cassandra being based on JVM. Another thing we noticed that multiple major firms like Discord, Hotstar, etc. migrated to scylla and noticed a significant improvement in their P99’s especially reads. The numbers looked appealing and we decided to give ScyllaDB a try.
The POC
Initially we thought of running ScyllaDB with EBS backed volumes since taking EBS snapshots is very easy and AWS gives an automated way of doing that. We launched a small 3 node scyllaDB cluster to test write throughput of a small test account. Dual writes for test account were enabled from spark pipeline. When we marginally increased the write throughput, we started getting errors in writes saying : node was overloaded. Too many in flight hints.
This basically meant that the disk was not able to cope up with a higher number of write IOPS. When we saw multiple community posts on scyllaDB slack channels, we understood that scyllaDB is perfectly tuned for nvme nodes and using an EBS storage would not help in gaining the full advantage of this database. Also, given our need for high IOPS, which is more than 16k, GP3 volumes would not suffice. Hence, we launched a i-series based cluster (i3en) which worked perfectly smooth and validated our POC.
Scylla Consumer
Now we wanted to test this new database on a load which is similar to our currently running production load on cassandra. We launched a 3 node i3en.6xlarge cluster, which should be able to handle the load currently served from a single m6a.24xlarge cassandra instance. First step was enabling all the writes of our IOT data, processed and raw, to scyllaDB. Basically, dual writes from existing spark and spring boot applications.
Now, the writes part was sorted. But the real test will come when we fire equal number of read queries also on the new cluster.
Hence, we decided to push the select queries to a new kafka topic scylla-consumer whose consumer application will basically consume these events and execute them on the scyllaDB cluster with good enough number of partitions and consumers for parallelization.
We wrote multiple interceptors at application level using AspectJ to intercept all sorts of read queries and pushed them to scylla-consumer kafka topic.
Therefore, all our production reads and writes started happening parallely on the new scyllaDB cluster giving us a perfect POC setup for production like environment.
POC Done
The new cluster was performing well, giving P99 write latencies of under 5ms and P99 read latencies under 100ms. The cluster was serving on avg 14–20k req/s. Writes itself would peak around 17k req/s. CPU’s were also normal not breaching 60% at any point of time. We also stress tested the cluster by generating a lot of raw data events creating a lot of write queries which went upto 50–60k req/s. CPU’s did go above 80% but the P99 latencies still were under the acceptable ranges (writes being under 10ms and reads being under 200ms). The new setup was now validated. Bingo !!
Application level changes
Although scyllaDB claims of being a drop-in replacement of cassandra, the statement is not entirely true. It has imposed a lot of query restrictions in terms of aggregations, IN queries or even multi-column queries and frankly having a few basic bugs here and there compared to a more mature cassandra DB. Here are a list of few issues we had to face :-
- Scylla sum function fails when used with case-sensitive column names
- Combination of COUNT with GROUP BY is different from Cassandra in case of no matches
- CQL Multi column restrictions are allowed only on a clustering key prefix — STILL OPEN
- Allow selecting map values and set elements, like in Cassandra 4.0 — OPEN
- Scylla manager fails to backup custom UDA
- Unable to decommission a node
- Scylla manager backups intermittently fails
The result was that, we had to make a lot of changes in our cassandra spring repositories. Although, the activity itself was lengthy and time consuming, it gave us a window to relook at all those unoptimised queries we used to run in cassandra, think again about our partition keys and clustering orders and eventually, re-designing most of our older tables to a better, more efficient versions of their counterparts. We mainly ensured that the partition sizes should not be a very large number as well as tried to refactor our materialized views not using unnecessary IN queries thereby improving the performance.
Migration
The migration of old data was planned in such a way that any non-raw data table was migrated using Spark jobs and for raw data, since it’s a huge amount of data, we planned to migrate some data partially and for remaining, decided to keep a partition date in the application itself using which older data will be read from cassandra and new data from scyllaDB. For this approach, we implemented a DateBasedQuery strategy like below :-
@Slf4j
@Builder
public class DateBasedQuery<T> {
private DateFilter dateRange;
private Function<DateFilter, List<T>> cassandraDataAccessor;
private Function<DateFilter, List<T>> scyllaDataAccessor;
private Integer accountId;
private List<Integer> accountIds;
private Class<?> entityType;
public List<T> results() {
List<T> resultList;
ScyllaProperties scyllaProperties = SpringContext.getBean(ScyllaProperties.class);
resultList = getScyllaAndCassandraData(scyllaProperties);
return resultList.stream().filter(Objects::nonNull).collect(Collectors.toList());
}
private List<T> getScyllaAndCassandraData(ScyllaProperties scyllaProperties){
List<T> resultList = new ArrayList<>();
LocalDateTime partitionDate = LocalDateTime.now();
if (Objects.nonNull(scyllaProperties)) {
partitionDate = scyllaProperties.getFromDate();
}
if(ScyllaUtil.isScyllaEnabled(getAccountIds(), this.entityType)) {
Optional<DateFilter> cassandraDateRange = getDateRange(dateRange, DateUtils.FOUNDATION_DATE, partitionDate.minusSeconds(1));
Optional<DateFilter> scyllaDataRange = getDateRange(dateRange, partitionDate, LocalDateTime.now());
scyllaDataRange.ifPresent(dateFilter -> resultList.addAll(scyllaDataAccessor.apply(dateFilter)));
cassandraDateRange.ifPresent(dateFilter -> resultList.addAll(cassandraDataAccessor.apply(dateFilter)));
} else {
resultList.addAll(cassandraDataAccessor.apply(dateRange));
}
return resultList;
}
}
Here, as you can see, getScyllaAndCassandraData would split queries based on partition date, combine results and returns the final response.
Secondly, if you remember, we used to find and delete old SSTables from cassandra DB, which was not a good way to purge older data. For this, we planned to keep month wise tables for raw_device_data so that older data can be easily purged. The challenge was, however, reading from multiple tables in case of multi-month range queries. For this, we implemented a GenericQueryExecutor :-
@Data
@Slf4j
@Builder
public class GenericReadQuery<T> {
public static final String ALL = "*";
private Class<T> resultClass;
private DateFilter dateFilter;
private List<Select> selectList;
private List<String> groupByList;
private List<Where<?>> whereList;
private String fromTable;
private Integer limit;
private Long accountId;
private List<Order<T,? extends Comparable<?>>> orders;
public static class GenericReadQueryBuilder<T> {
private List<Where<?>> whereList = new ArrayList<>();
private List<Select> selectList;
private List<String> groupByList;
private List<Order<T,? extends Comparable<?>>> orders;
public static Select[] sum(String... fields) {
return Arrays.stream(fields)
.map(field -> new Select(field, Select.AggregationPolicy.SUM,""))
.toArray(Select[]::new);
}
public static Select[] avg(String... fields) {
return Arrays.stream(fields)
.map(field -> new Select(field, Select.AggregationPolicy.AVG,""))
.toArray(Select[]::new);
}
}
}
@Slf4j
public abstract class ReadQueryExecutor {
public <T extends Serializable> Optional<T> getOne(GenericReadQuery<T> query){
List<T> result = getAll(query);
if(CollectionUtils.isEmpty(result)){
return Optional.empty();
}
return Optional.ofNullable(result.get(0));
}
public <T extends Serializable> List<T> getAll(GenericReadQuery<T> query) {
try {
validateWithinCartesianLimit(query);
List<T> result = getResults(query);
// Perform aggregation on the results.
result = getAggregatedResult(query, result);
// Perform sort on the results, according to the provided order.
sort(result, query.getOrders());
// Impose limit on the results, according to provided limit in query.
return limited(result, query);
} catch (Exception e){
log.error("Query Executor",e);
}
return new ArrayList<>();
}
}
@Component
public class ScyllaRawDeviceDataReadQueryGenerationStrategy extends ScyllaReadQueryGenerationStrategy {
@Override
protected List<String> getDateFilters(DateFilter dateFilter) {
LocalDateTime from = dateFilter.getFromDate();
LocalDateTime to = dateFilter.getToDate();
String filter = "";
if (Objects.nonNull(from)) {
filter = String.format(" \"createDate\" >= '%s' ", from.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
}
if (Objects.nonNull(to)) {
filter += " AND " + String.format(" \"createDate\" <= '%s' ", to.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
}
return Collections.singletonList(filter);
}
@Override
protected List<String> getTableNames(GenericReadQuery<?> query) {
return getTablesWithMonthWiseSplit(query);
}
public List<String> getQueries(GenericReadQuery<?> query) {
List<String> queries = super.getQueries(query);
Collections.reverse(queries);
return queries;
}
protected List<String> getTablesWithMonthWiseSplit(GenericReadQuery<?> query) {
DateFilter dateFilter = getRequiredDateRange(query).orElseThrow(() -> new InvalidParameterException("Date range not found."));
Pair<Integer, Integer> startingMonthAndYear = getMonthAndYear(dateFilter.getFromDate());
Pair<Integer, Integer> endMonthAndYear = getMonthAndYear(dateFilter.getToDate());
List<String> tableNames = new ArrayList<>();
for (Pair<Integer, Integer> current = startingMonthAndYear; lessThanOrEqual(current, endMonthAndYear); current = incrementMonth(current)) {
tableNames.add("traffickeyspace." + query.getFromTable() + "_" + current.getLeft() + "_" + current.getRight());
}
return tableNames;
}
}
If you notice, getTableNames method will return list of table names month wise for which query has to be executed.
Conclusion
Finally, with a great team effort by both developers and devops, in Dec 2023, we went live serving live traffic from new scyllaDB cluster. A cluster, which is now horizontally scalable, highly available and having predictive P99 latencies. We also ensured that our RTO and RPO is kept at 1 hr by taking an hourly incremental backup of critical tables to S3 using scylla manager. For DR purposes, we also do daily backup to a separate AWS region for all the critical tables.
Below is an evidence for drastic improvements in P99 reads post migration :-