A Word About Highly Available Spark + Cassandra

Jeremiah Edwards
Apteligent
Published in
5 min readOct 10, 2016

Or why you couldn’t see your crash data for a while…

Yesterday our Platform and Operations teams learned a few valuable lessons about running highly available distributed systems (no simple thing). Unfortunately, we learned these things the hard way, and data was unavailable in our web portal for several hours until we got things working again. Like with all major incidents, we’ll be doing a full Post Mortem in the coming days in order to find out exactly how we can prevent this type of delay from happening in the future. Right now, though, I’d like to tell you about how we put out the fire.

What happened?

The root cause of our service outage was the loss of one of the nodes in our Cassandra ring. Cassandra is a core part of the Serving Layer in our lambda architecture, and is essential for displaying aggregate performance metrics.

All of our servers run in the AWS cloud. A consequence of having such flexible infrastructure is the fact that servers can and do just go away. After all…

When the Cassandra host became unavailable, our Spark Streaming jobs stopped being able to write real-time data to the cluster. Every time a microbatch completed, we just saw this:

6/10/06 09:44:27 WARN TaskSetManager: Lost task 24.0 in stage 3.0 (TID 829, <IP ADDRESS>): java.io.IOException: Failed to write statements to crash_stream2.details_pt1m.at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:164)at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:139)at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:139)at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1$$anonfun$apply$1.apply(DStreamFunctions.scala:54)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at org.apache.spark.scheduler.Task.run(Task.scala:89)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:745)

Why did it happen?

The stack trace above was completely useless and gave us no hint at all as to what was going on. It turns out that these writes started failing because the Spark-Cassandra Driver we use in our streaming jobs could not achieve the required level of consistency it needed in order to correctly accomplish the saveToCassandra operations we were making.

OK, so, what does that mean?

An introduction to Cassandra Write Consistency Digestible at 4AM when Everything is on Fire

Cassandra is great. It can handle a massive write workload and not blink an eye. (Well, if it had eyes. Which it doesn’t. But if it did, it wouldn’t blink them. Not even one.) It also has tuneable consistency for both reads and writes. Without getting into all the details, it’s enough to understand that there is a strong relationship between consistency and availability.

If you want to make highly consistent writes (or reads), your system better have enough copies of the data available to achieve that level of consistency. In particular, your Replication Factor and your Desired Consistency have to play nice with each other, otherwise your writes (or reads) will fail.

Seemed like a good idea at the time…

And now, back to our outage…

One important fact about our system is that we don’t really use Cassandra as an authoritative data store. Instead, we use it to cache the results of batch and streaming jobs run on Spark, and then to make that data available to our API.

Because of this, we chose to use a Replication Factor of 2 — good insurance against data loss in case of node failure and less expensive than running a full RF>=3 set up.

Unfortunately, RF=2 does not give you a system that is highly available for writes unless you use a Write Consistency of ONE (meaning the write must be persisted to the commit log and memtable of only a single node in order for the write to succeed).

How Cassandra writes happen

As fate would have it, in our streaming code, we mistakenly used the default Cassandra WriteConf which falls back on a consistency level of LOCAL_QUORUM.

The source of our Doom

Val consistencyLevel: ConsistencyLevel                  consistency level for writes, default LOCAL_QUORUM

Unfortunately, when data is only backed up once (RF=2), quorum-level consistency is the same as requiring that ALL replicas acknowledge a write before the write succeeds:

QUORUM = (RF / 2) + 1 = 2

This setup works fine when all nodes in the Cassandra ring are available, but it’s not a true highly available system: the loss of a single node means that no writes will succeed.

How did we fix it?

Since no data was lost, it was sufficient to add a shiny new Cassandra host to our cluster, and let it replicate data from other nodes. Given the size of each node, however, the time it takes to bootstrap a new node is measured in hours, not minutes — not acceptable when writes are failing.

In order to get the system back online as fast as possible, our team put a hotfix together which changed the write consistency level to ONE, and redeployed all of our spark aggregation jobs.

Here’s some hotfix code:

// emergency writeconf to reduce quorum needed to one.
val writeConfReal = WriteConf.fromSparkConf(dStream.context.sparkContext.getConf)
val emergencyWriteConf = new WriteConf(writeConfReal.batchSize,
writeConfReal.batchGroupingBufferSize,
writeConfReal.batchGroupingKey,
ConsistencyLevel.ONE,
writeConfReal.parallelismLevel,
writeConfReal.throughputMiBPS,
writeConfReal.ttl,
writeConfReal.timestamp,
writeConfReal.taskMetricsEnabled)
dStream.map(mapErrorSummaryToErrorSummaryCassandra)
.saveToCassandra(<cassandraKeyspace>, <cassandraTable>, writeConf = emergencyWriteConf)

This let us simultaneously stream live data into Cassandra directly and backfill data that we were unable to load during the outage.

What we’ll take away

This outage was a big interruption of service for us and our customers. We still need to work to understand exactly how to prevent this type of problem from happening in the future and to improve our incident response. We’ve done a lot of learning in the last 24 hours, though, and our infrastructure today is more reliable than it was yesterday.

Also, write configs really really matter, and never trust defaults to do what you want!

--

--