A Bumpy Journey To (Re)Write A Bulk Upload API For Cassandra — Part 1: The Database

A deep dive into how Cassandra handles data writes and consistencies

Stephen Tse
19 min readMay 17, 2020

Series Topics

  • Part 1: Prologue & Apache Cassandra (Azure Cosmos DB API)
  • Part 2: Dynamic Rate Limiting / Congestion Control
  • Part 3: Batch Processing

At work, I’ve been tasked with redoing an old major feature for quite some time now. Its main purpose is to serve millions of records reliably and quickly from our Cassandra database for lots of concurrent queries, but the previous design and implementation were buggy and also fell short of that goal, eventually leading to a complete overhaul.

A few weeks before production launch, and just when I thought nothing would ever go wrong again (shouldn’t have jinxed it!), a surprise bug caught me completely off guard: I couldn’t use the old admin API to upload a list with even just a few thousand records to test my core code on the cloud! Our database always gave me an OverloadedException after just a short moment. Fancy performance reports DO NOT MATTER so long as business cannot use the feature! My colleagues hopped in to help, but they couldn’t figure out much at the time.

In a desperate effort to salvage my hard work (you know, I still jump a little when the lead dev “hello” me on Slack, possibly PTSD 😅), I underwent “a week of hell” to investigate the root cause myself and come up with a solution. Naturally, as a qualified c̶o̶m̶m̶u̶n̶i̶s̶t̶ software engineer, I must share this experience so that no one else, with the help of search engines, will ever need to endure this pain again. So here I am, resuming blogging 5 years later. Wish me luck! (As I already attempted to reboot writing before, and have several other drafts stalled for so long I’ve mostly forgotten what to write anymore, hopefully procrastination won’t get the best of me this time!)

This series could be a waste of your time…

1) If you’re just looking for a one-time solution to bulk upload a few big files, chances are you might not need to read this, as there’re several command-line tools in the wild already at your disposal:

  • The official Cassandra documentation recommends a solution that requires you to first transform you data set into SSTables, the file format Cassandra used to store data on disks, with the ‌org.apache.cassandra.io.sstable.CQLSSTableWriter Java API, then use the bundled sstableloader or nodetool import command-line tool to load them into the database like restoring a backup.
  • The CQL Shell, or cqlsh, provides a COPY FROM command that can be used to copy data from a CSV file to your table.
  • If you’re looking for a more powerful solution (e.g. uploading JSONs or conversely, unloading lots of data from DB), DataStax, the long-time “evangelist-in-chief” of Cassandra, made their once-paid Bulk Loader (dsbulk) freely available for personal and commercial use. It also claims itself to be 2 ~ 3 times faster compared to cqlsh COPY due to multi-threaded operation.

2) The Sam’s Club Tech division has been on a mission to migrate our resources from Walmart’s internal OneOps platform to Microsoft Azure. Consequently, we’re no longer using native Cassandra, but rather the Cassandra API of Azure Cosmos DB. There aren’t many major differences though, as we were informed by a Cosmos dev team member, their API is an (incomplete) re-implementation of Cassandra on top of the Cosmos infrastructure rather than an emulation, and as you’ll soon see, the way Cosmos partitions data internally is quite similar to Cassandra. For these reasons, I will discuss in here as though I was working on a native Cassandra cluster, although it will also be Azure-themed. Feel free to skip any of the sections if they do not apply to you, e.g. dynamic rate limiting in Part 2 if your database vendor doesn’t charge and limit you by throughput (or if you can afford to relax that limit by a fair amount occasionally).

3) Code samples discussed here will be Java-based. Love it or hate it, Java is still one of the most powerful and versatile garbage-collected languages out there with a huge ecosystem, and with (long overdue) exciting new changes like Project Loom on the horizon, it’s probably still not easy for innovators like Golang or Kotlin to overtake the elephant (who would have thought the elephant can copy, eventually?). And since Cassandra is written in Java, knowing Java allows you to utilize its bundled APIs when necessary, such as the CQLSSTableWriter I mentioned previously. In any case, I feel that backend developers in other languages may benefit from learning about some of the enterprise knowledge the Java community has accumulated, even if they’re planning to implement their own solutions. I’ll briefly discuss one of them in Part 2 (if you know the JSR–352 Batch Application Specification already you can skip that section too).

4) Some of the block quotes and web links in this article are not closely related to the main topic (such as this one 😉), but simply extensions I felt compelled to include in my blog (I have a habit of “associative memoing”). D̶o̶n̶’̶t̶ ̶s̶a̶y̶ ̶i̶t̶’̶s̶ ̶b̶a̶d̶ ̶w̶r̶i̶t̶i̶n̶g̶ ̶t̶h̶i̶s̶ ̶i̶s̶ ̶m̶y̶ ̶b̶l̶o̶g̶ ̶I̶ ̶c̶a̶n̶ ̶d̶o̶ ̶w̶h̶a̶t̶e̶v̶e̶r̶ ̶t̶h̶e̶ ̶h̶e̶c̶k̶ ̶I̶ ̶w̶a̶n̶t̶ Feel free to skip any of them~

A simple intro to Cassandra architecture

Despite its very SQL-like query language (CQL), Cassandra, similar to its NoSQL siblings, is almost nothing like the good old SQL databases developers have grown familiar with for decades. It will cost one dearly in terms of both engineering efforts and operation costs if this fact is not understood soon enough. (An anecdote: when the “Cloud Native” trend started to gain momentum in office, there used to be this idea floating about claiming teams should just ditch their MySQL databases and move all tables, regardless of use cases, to Azure Cosmos DB with Cassandra API, because “NoSQLs are simply superior to SQLs”. That, if carried out, will probably be one of the messiest and most expensive refactoring decisions ever made, and fortunately no one ever did.)

Monolithic SQL databases normally provide the safer and more intuitive abstractions everyone loves about, such as transactions that hide away the complicated parts with locks and latches and “just work” by providing strong ACID guarantees. They become more and more difficult to scale in heavy load with just vertical scaling. When developers finally decide to distribute their data across servers, they face the classic dilemma as described by the CAP theorem: when retrieving distributed data from a (most commonly) unreliable network, if there’s a network partition (nodes losing sync with each other due to network failures / delays / etc.), one must weight and choose between availability (you always get some versions of data out of some node without the system throwing errors) and consistency (the system throws an error if they can’t guarantee the data you read is the most recent version).

Worse still, going down the path of distributed data store without much hand holding and many safety buckles is also darker: in the NoSQL world, we really have to understand the underlying system to know exactly what trade-offs you want to make, or have made (probably unknowingly before). This is a bit like coding in non-garbage-collected languages such as C/C++ in pursuit of raw performance: no one without proper knowledge of computer systems can ever be a decent C/C++ developer. (Don’t believe me? Just listen to Bryant and O’Hallaron!)

For these reasons, before we can discuss the problem properly, we’ll need to establish some basic understanding about the Cassandra architecture and how it handles data writes. Fortunately it won’t take long~

[Figure 1.1] Just in case that link was closed for public access, and someone needed something to search for “Bryant and O’Hallaron”…

How Cassandra partitions data

Cassandra is a truly decentralized distributed system that has no leader and thus no single point of failure (and it works better this way unless you accidentally introduce one, which we’ll discuss in the next chapter).

A typical Cassandra cluster consists of a ring of servers called nodes. Each node is assigned a range of hash keys called tokens. The overall possible token range is -2⁶³ ∼ 2⁶³-1, i.e. the range of a signed 64-bit integer, when using the default Murmur3Partitioner as the partition key hash function (formally named Partitioner; I simplified token ranges’ representation in Figure 1.2 and beyond). When someone creates a new table with a primary key (a.k.a. row key) definition such as PRIMARY KEY ((pk1), ck1, ck2), the partition key column(s) of any future items, pk1 (the surrounding parentheses can be skipped if it’s not compounded, although I’d recommend you to keep it anyway for clarity), will be hashed by the partitioner to a specific token. This allows all items with the same partition key to be placed in the same partition, which is exclusively owned by that key, on the same node that owns the token range. After that, optional clustering columns can be used to order data belonging to the same partition in storage, in ascending alphabetical order by default. Finally, a primary key / row key uniquely identifies a row.

[Figure 1.2] Who wants a Lucidchart / Visio? Stylus is the best drawing device in the world! (Did someone say something similar before?)

Tables vs. Column Families

“A collection of rows with columns” used to be called Column Family in the now-deprecated Thrift APIs; Cassandra has since adopted SQL-like terminologies such as Table in CQL since Cassandra 1.2. A most welcoming change indeed!

null means “no such column for this row”

Contrary to SQL database, where every row has the same amount of columns (and null means “the current row doesn’t have a value for a specific column”), Cassandra, as a typical wide column store, allows you to have varying numbers of columns for each row. This is possible because Cassandra internally stores only a subset of columns for each row. While this adds a lot of flexibility to table schemas (and potentially saves a ton of space for unstructured data / sparse matrices), it also implies some limitations. Cassandra disallows null values in primary keys. Moreover, if you try to filter normal columns with ALLOW FILTERING or secondary indexes (both are not recommended in Cassandra by the way; we’ll touch on data modeling and materialized views in the next chapter) through queries e.g. SELECT * FROM table WHERE col != 'value', the result will exclude rows with col “equaling to” null because those rows don’t contain column col at all.

As SSTables are immutable and writes are in fact append-only, setting a column to null means you marked it for soft-deletion with a tombstone; eventually it will be physically removed during compaction.

The Problem of Hot Partitions and How Composite Partition Keys Come To The Rescue

We’ve known by now all items in a partition stay on the same node. This can lead to an issue called hot partition, which refers to partitions that are either accessed too frequently or too large in size, thus putting unbalanced stress on the node they’re assigned to. It’s typically caused by improper partition key design that fails to distribute either load or data evenly across nodes, such as thousands of items sharing the same partition key or just one item containing far too many columns (i.e. a “wide row” that has gone too wild). These can be mitigated by using composite partition keys, wherein a partition key contains multiple columns. For instance, if a table contains only 1 partition key column pk1, and there are a million items that share the same pk1 value, adding an integer with a fixed range [0, 666) as the second partition key column pk2 (I call it “evil shard key” ;p) will spread those items much more evenly across the node ring, a technique known as sharding. You’ll also get pagination without the driver’s help as a bonus so S̶a̶u̶l̶ ̶G̶o̶o̶d̶m̶a̶n̶ s’all good, man!

As you can see, Azure Cosmos partitions data quite similarly to Cassandra (at least for the sake of our discussions in this article). Logical Partitions and Physical Partitions in Cosmos roughly correspond to Partitions and Nodes in Cassandra. All items in the same logical partition also go to the same physical partition, meaning Cosmos subjects to the same hot partition issue we discussed earlier. If you use the native Cosmos SQL API, you’ll need to construct a synthetic partition key, which is basically the combined one-column version of Cassandra’s composite partition key (note that you can also do so in Cassandra if you choose not to use composite partition keys to distribute data).

🚨 Azure Cosmos Cassandra API is NOT a real Cassandra!

Even though the Cassandra API works similarly to Cassandra, one must not assume that they are able to just migrate their entire Cassandra workflow to Azure Cosmos, as their re-implementation is incomplete. We’ll talk more about some of its limitations in the next chapter, but if you really have to stop reading now and you’re planning to use Azure services exclusively, be sure to bookmark Cosmos’ Cassandra Support Page and look out for any missing Cassandra features that you actually require.

How Cassandra manages consistency

It is dangerous to keep just one copy of your data inside the one node that owns the token, and it also hurts availability if that node is down even temporarily. Cassandra provides data replication to help mitigate the risk. Replication Factor (RF) defines the number of nodes, including the token owning node, that’ll receive copies of the same data. If RF > 1, the rest of the nodes will be decided by the chosen replication strategy (note that one should always choose NetworkTopologyStrategy in production to make sure replicas are allocated to different racks in a data center to maximize availability).

As per previous discussions about CAP theorem, getting data from replicas has a big caveat: we must choose between availability and consistency when data are out of sync among nodes. Cassandra allows you to make these choices at a fine-grained level by providing consistency levels, which defines the number of nodes that must respond to a read / write request before the database returns result. You can either loosen consistency in favor of better performance and fewer database errors by picking a weak consistency level (this is called eventual consistency), or favor strong consistency in most cases by picking strong consistency levels at LOCAL_QUORUM or higher for both reads and writes.

How do QUORUM reads & writes provide strong consistency?

Choosing QUORUM as the consistency level means you’d like Cassandra to wait till a majority (more than half) of the replica nodes has responded before returning results. There’re 3 flavors of QUORUMs in Cassandra, sorted by strictness in descending order:

  • EACH_QUORUM: A majority of replica nodes in each data center must respond.
  • QUORUM: A majority of replica nodes across all data centers must respond.
  • LOCAL_QUORUM: A majority of replica nodes in the same data center that received the client request must respond.

It’s a convenient rule of thumb to set both read and write consistency levels to at least LOCAL_QUORUM to provide the best balance between availability and strong consistency. This is inspired by the influential Amazon Dynamo white paper, which adopted a quorum-based approach stating that strong consistency can be achieved only if the following formula is satisfied:

r + w > n

  • r: Total number of nodes that must be queried and respond for each read
  • w: Total number of nodes that must send confirmation before a write is successful
  • n: Total number of replica nodes
[Figure 1.3] It’s probably easier to understand quorum-based voting if you picture read and write confirmations as a Venn diagram

Note that the “strong consistency” described here ≠ “strict consistency”. There’re rare cases where the database still returns stale values, such as multiple concurrent writes to the same primary key (Cassandra resolves write conflict by checking timestamps and allowing Last Write Wins, but since it still reports success to all concurrent writes when all but one write are dropped silently on a node, this can break consistency for clients whose write was discarded). Moreover, data corruption can happen in presence of partial write failures (e.g. a write failed at the second replica with RF = 3 and read & write CL = QUORUM) because there’s no rollback mechanism across nodes, and it can’t be reliably resolved until one of these three happened (repairing dirty replicas is a complex topic we won’t dwell on here; please refer to the DataStax documentation for further details):

  1. The client resends the request in QUORUM write CL and gets it executed successfully.
  2. The client uses the ALL read CL and let read repair fix all outdated replicas before returning.
  3. The DBA initiates an anti-entropy repair manually.

Nonetheless, QUORUM consistency is good enough in many cases.

How Cassandra handles data writes

As Cassandra is decentralized, when a client first sends a write request to a cluster, it can reach any nodes in the ring. That node will serve as the coordinator node, use the partitioner to calculate a token for the request, and route the request simultaneously to all the related replica nodes based on the pre-configured replication factor and replication strategy. The coordinator node will then wait till at least the specified number of replica nodes, as defined by the chosen consistency level, reported write success before reporting back to the client.

In practice, clients are advised to configure their driver to use a token-aware load balancing policy, which indicates they prefer their coordinator node to be the actual node that owns the token for their writes whenever possible, saving one network round-trip.

[Figure 1.4] A simple write example with a token-aware load balancing policy and a replication factor of 3 at QUORUM consistency level

Think b̶l̶o̶c̶k̶c̶h̶a̶i̶n̶ “decentralized”: batches are (mostly) bad news for performance

Phew! Now that we have a basic understanding of how Cassandra handles simple data writes internally, we’re ready to move back to the main topic. After some digging into the code that did the actual CSV bulk upload, I slowly realized I was dealing with a legacy issue that originated long before I joined the team in the internal Cassandra ORM library shared across the department.¹ When our admin API receives a CSV file from a client, it parses the file and feeds all its content to an API of the ORM library. From there, data are split into many INSERT statements and bundled in one big UNLOGGED batch before being sent to the database. The entire operation is synchronous, i.e. the ORM API will wait (blocking the main thread which invoked the API) until the database calls back regarding the execution of the batch statement. So what’s not right about all these?

Batches in Cassandra are probably a bit confusing to new Cassandra developers, as they sound like something that’s designed for bulk operations. (Just bundle all statements to save a lot of connections that would occur if sent separately, yeah?) Be warned! This is in fact quite far away from the truth! (Let’s hope the Apache Cassandra team improves their CQL documentation to address this misconception in the future.)

As we discussed, it’s not really in NoSQLs’ nature to provide ACID-compliant transactions like monolithic SQLs. Nonetheless, Cassandra tries to alleviate some of the pain by providing lightweight transactions (LWT) and batches. For batches, they are primarily used for performing atomic operations, i.e. either all modification statements (can be INSERT, UPDATE, or DELETE) in a batch are executed successfully, or nothing happens at all. This is rather useful in the Cassandra world, as data modeling are query-driven, and application developers are f̶o̶r̶c̶e̶d̶ required to denormalize data relations and often have to keep in sync several query tables that are actually based on the same dataset. By the way, to save the hassle of maintaining many replica tables in code, for each SELECT query type your application logic requires, it’s often recommended to create a materialized view instead on top of the main table; updates made to materialized views are done using batches internally.

🚨 Azure Cosmos Cassandra API is NOT a real Cassandra!

Why the Azure Cosmos team made such decision is beyond me, but at time of writing, their Cassandra API doesn’t support either materialized views or LOGGED batches. In other words, there’s no reliable way to keep query tables in sync on Cosmos. Another reminder that their offering is crippled and should be evaluated with caution, as it doesn’t support some of the most common use cases in native Cassandra.

Behind the scene, an UNLOGGED batch is processed quite similarly to a simple data write we talked about in the last chapter, except that the coordinator node now has to dispatch all the bundled modification statements to their corresponding replica nodes, making it the single point of failure in the entire operation. This is not safe of course since atomicity (the “A” of ACID) disallows an update to occur only partially, but as we discussed in the previous chapter, Cassandra doesn’t support rollbacks across nodes in presence of failures. That’s where LOGGED batches come in: after the batch reaches the coordinator successfully, it’s guaranteed that all of its statements will eventually succeed because copies of the batch, or batchlogs, are first saved to 2 other nodes as safety precautions: if the coordinator ever goes down, the backup node(s) will replay the batch, completing the job eventually. Since batchlogs are useful only when there’re multiple partition keys, Cassandra will execute a LOGGED batch as UNLOGGED when it contains modifications only to a single partition, saving the round-trip cost of writing batchlogs twice and waiting for backup nodes’ responses before the actual writes to replica nodes begin.

[Figure 2.1] Extending the example from Figure 1.4: A write for a LOGGED batch containing 2 different partition keys, sent with a token-aware load balancing policy and a replication factor of 3 at QUORUM consistency level; remove the batchlog writes and confirmations and you’ll get UNLOGGED batching instead

Now that we’ve known how batches work, it’s easy to understand why it’s ill-advised to use them to import large amount of data with different partition keys: it essentially introduces a leader, the coordinator node, to an otherwise decentralized system, and it’s putting too much stress on that node by letting it process all the traffic and coordinate writes to many replica nodes, which normally include all nodes in a cluster if partition keys are evenly distributed. Choosing either LOGGED or UNLOGGED also doesn’t really matter in this case, as the 2 batchlog writes are clearly not the bottleneck here.

The right way to fix it then, is to think “decentralized”: we can avoid overloading a particular node by dismantling a batch back into individual statements and sending them directly to their belonging nodes. This translates to “asynchronous insertions sent with a token-aware load balancing policy”, and it should be the more reliable way to do bulk insertions quickly.

[Figure 2.2] Same setup as Figure 2.1, but with traffic unclogged: muuuuuch cleaner :)

Q: Wait, what about UNLOGGED batches for insertions with the same partition key?

A: Interesting idea! In the case where your batches contain mostly the same partition key, it could potentially speed up data import if you combine UNLOGGED batches with a token-aware load balancing policy, since the statements are only going to the same node (which makes the write path similar to Figure 1.4), and Cassandra will actually treat them like a single big write internally to optimize performance. In fact, there’re experiments suggesting batches exhibiting high locality can outperform async statements. There’re a few catches worth noting though:

  1. If you have a large number of statements with the same partition key that’ll make UNLOGGED batches really worthwhile, inspect your data model design for any potential hot partition issues.
  2. Batches are designed to work as a whole, that means if any of the statements in it fails, the entire batch fails. Besides the potential inefficiencies it can bring, beware of the danger of retrying failed big batches when applying a retry policy.
  3. Keep the batch size in check. It can’t exceed the maximum size of a single operation the Cassandra cluster supports (in Cosmos, the maximum batch size allowed is merely 100 unless you raise a support ticket), nor should it cause the query to time out. You’ll need testing to get to the ideal size that’ll maximize performance for you.
  4. If you’re using Azure Cosmos Cassandra API, be extra cautious about UNLOGGED batches. As mentioned before, Azure Cosmos charges and limits you by throughput. Unfortunately, since Cosmos evenly distributes the throughput you provisioned to a container among its physical partitions, you’ll be wasting all the throughput you paid for that’s in fact provisioned to other physical partitions when you write only to one physical partition at a time (in other words, you’ll be more likely to face OverloadedExceptions with large batches). To save you the headache of throughput micro-management, token-aware async insertions should be preferred over token-aware UNLOGGED batches for bulk imports with mixed partition keys on Cosmos Cassandra API.

While sending separate insertions asynchronously is a generally right direction towards a fix, there’s one practical obstacle: you cannot defy physics. Concretely, if you have a huge amount of data (thousands and millions of them), unless you’re willing to temporarily scale out your Cassandra cluster by a fair amount, flooding your database with all those async requests at once is only going to overwhelm your servers for sure. This is especially true when you’re using Cassandra on the public cloud, such as the Azure Cosmos Cassandra API we’ve been talking about, or the AWS managed Cassandra service, wherein the pricing model is typically throughput-based and service vendors actively throttle the traffic rate on the server side (on Azure, Cosmos takes both incoming and internal inter-node traffic into account, meaning it can be overloaded well before incoming traffic reaches the provisioned request units because of data replications etc.; higher consistency level = more throughput required).

A typical way to handle transient failures on Cassandra is by applying the retry pattern (for Azure Cosmos users, Microsoft provides a retry policy through a DataStax driver extension set that can retry OverloadedExceptions after a certain period of time the server specifies in the returned error messages), but this will be a terribly inefficient solution for bulk imports. As a general rule of thumb, increasing load into a system that is already overloaded is only going to degrade performance further. Besides, as business requires that data ingestion happens as soon as they call the API, there’s no control over when I can schedule the job to actually happen, and I would very much like to avoid degrading overall database performance because of large uploads during normal business hours. Some control has to be exerted on the traffic volume before the async insertion idea becomes actually viable. After scratching my head for a few hours at night (my hair: “Danger! Danger!”), I conjured up a plan.

To be continued in Part 2: Dynamic Rate Limiting / Congestion Control.

  1. It’s rather puzzling as to why no one else ever noticed this issue before if it’s a shared internal library. If I have to guess, it’s because the ORM library was written by a very reputable senior engineer who happened to be the former architect of my team (judging from his legacy, he was indeed a great architect) before I joined. Many of the internal libraries he wrote and shared across teams are left somewhat unmaintained after his departure, as they’re battle-tested and people seem to be rather risk-averse on making any major changes to complicated things that are widely shared (and they don’t own). Moreover, as you’ll see, the issue we’re discussing here is more like a performance bug, which would normally be difficult for QAs to catch back in the days when we were still in the internal cloud and owned an in-house DBA team, since developers could just ask them to scale out resources whenever there was a performance issue, without going too deep into granular budgeting first. Staying too comfy at home does have its cons! (I’m definitely not hinting about the current COVID–19 work-from-home situation btw, because it’s been miserable…😔) In the future, I might blog about some other issues I’ve tackled that share similar roots during this grand journey of public cloud migration.

--

--