Bulk ETL from Hadoop to Cassandra

Conductor R&D Team
Conductor R&D
Published in
6 min readJul 11, 2017

Cassandra’s distributed architecture enables a high write throughput compared to other NoSQL and SQL database systems. Most benchmarks focus on incremental writes, but data can also be efficiently ingested by Cassandra in bulk. Bulk loading is an attractive option when all the data to be loaded is available ahead of time — for example, as the result of a Hadoop Map/Reduce job.

We found ourselves developing a Hadoop pipeline that generated a large volume of data that we wanted to store in Cassandra as quickly as possible. To accomplish this, we built an ETL process for generating a Cassandra cluster’s data files as part of a Map/Reduce workflow, and then copying them directly into place on the cluster’s filesystem to publish fresh data to our users.

First, some technical background: SSTables are Cassandra’s on-disk format for data. Like HBase’s HFiles, SSTables are a representation of the contents of a table, sorted by row key, with separate files for indexes and Bloom filters over the keyspace. SSTables are numbered by “generation,” to allow Cassandra to distinguish between different versions of a table during long-running operations like compaction.

Cassandra describes its cluster topology as “rings” of nodes that share metadata about key ownership and the cluster topology itself. At any given time, each node in the ring is responsible for one or more ranges of tokens, where a token is effectively a hash of a row key. The cluster’s “partitioner” defines the strategy for calculating tokens from row keys. Tying Cassandra’s partitioning strategy to Hadoop’s concept of output partitions is key to the bulk loading approach we describe below.

Why did we build our own ingestion process, though? To be sure, support for bulk loading has been present in Cassandra for quite some time, and it’s shipped with various incarnations of a Map/Reduce integration — see CqlBulkOutputFormat in Cassandra 3.x, for example. However, many of these mechanisms involve performing a large volume of incremental writes to the cluster.

The sstableloader utility, for example, streams data through a single node, which then distributes it across the cluster. And while CqlBulkOutputFormat creates a real SSTable fileset as part of generating output, it delegates to the same code path as sstableloader to get the data into your cluster.

We were wary of taking a stream-based approach that required communication with a running cluster. Should our Cassandra cluster being offline cause our Map/Reduce pipeline to fail? Was it reasonable to wait minutes or hours to stream a terabyte of data into the cluster? What if this process — or part of the cluster topology — failed in the middle of loading? We knew we only needed to publish fresh data at punctuated intervals, and that we’d be generating a complete data set from scratch each time. Could we leverage the fact that our data would be read-only to make ingestion fast, atomic, and repeatable?

We’ve seen a few blog posts suggesting that a fully “offline” bulk table generation and load pipeline was possible, but no concrete description of how to do it. We hope the following explanation is useful to other teams contemplating a similar ETL workflow.

The approach described below breaks the process of bulk loading to Cassandra into two parts. First, we generate a set of SSTable files containing the data we want to publish to each node in our Cassandra cluster as the output of a Map/Reduce job. Then, we copy each of those sets of SSTable files to the local filesystem of its target Cassandra node, and synchronously add it to the “read path.”

The flow of data we were aiming for was that each of the reduce tasks in the final stage of our Hadoop pipeline would produce the SSTable data for a single node in our Cassandra cluster. To make this work, there were a number of steps we had to take.

First, to simplify the partitioning behavior of the cluster, we made each node in the target ring responsible for a single token range, by setting the initial_token property. We were using the Murmur3Partitioner implementation, which maps keys to the long integer values, which meant we could partition the token space arithmetically. For example, a five-node Cassandra ring would divide the tokens up as follows:

With the ring’s token ownership established, we configured our job to match. We set the number of reducers to the number of nodes in the ring.

job.setNumReduceTasks(NUM_CASSANDRA_NODES);

Then we wrote a Map/Reduce Partitioner implementation to route each mapper’s output to a reducer by computing the Cassandra token for its key, and bucketing that token by the ring’s token ownership ranges.

private synchronized LongToken[] getOrCreateTokenBuckets(final int numPartitions) { if (tokenBuckets != null) { return tokenBuckets; } /* Doing this math natively on the extreme values of Long causes numerical overflow. So do it with BigInteger instead. */ tokenBuckets = new LongToken[numPartitions]; final long partitionSize = BigInteger.valueOf(Murmur3Partitioner.MAXIMUM) .subtract(BigInteger.valueOf((long) Murmur3Partitioner.MINIMUM.getTokenValue())) .divide(BigInteger.valueOf(numPartitions)) .longValue(); long tokenUpperBound = (long) Murmur3Partitioner.MINIMUM.getTokenValue() + partitionSize; for (int i = 0; i < numPartitions - 1; i++, tokenUpperBound += partitionSize) { tokenBuckets[i] = new LongToken(tokenUpperBound); } tokenBuckets[numPartitions - 1] = new LongToken(Murmur3Partitioner.MAXIMUM); return tokenBuckets; } @Override public int getPartition(final ByteBuffer key, final V v, final int numPartitions) { final LongToken[] tokenBuckets = getOrCreateTokenBuckets(numPartitions); final Token token = Murmur3Partitioner.instance.getToken(key); /* This could be rewritten as a binary search for numbers of nodes / buckets. */ for (int i = 0; i < numPartitions; i++) { if (token.compareTo(tokenBuckets[i]) < 0) { return i; } } throw new IllegalStateException(); }

On the reduce side, we hijacked the sort phase of the job, and computed the tokens from the keys once again, this time to sort them by token in a custom RawComparator implementation to ensure they’d be processed in the order in which they needed to be written out to the SSTable on disk.

@Override public int compare(final ByteBuffer k1, final ByteBuffer k2) { final Token token1 = Murmur3Partitioner.instance.getToken(k1); final Token token2 = Murmur3Partitioner.instance.getToken(k2); return token1.compareTo(token2); }

In the reducer itself, we passed the data directly a custom RecordWriter that delegates to an instance of Cassandra’s own CQLSSTableWriter to actually create the various SSTable files.

public class CqlSSTableRecordWriter extends RecordWriter<NullWritable, Map<String, Object>> { private final CQLSSTableWriter writer; public CqlSSTableRecordWriter(final CQLSSTableWriter writer) { this.writer = requireNonNull(writer); } @Override public void write(final NullWritable key, final Map<String, Object> value) throws IOException, InterruptedException { writer.addRow(value); } @Override public void close(final TaskAttemptContext context) throws IOException, InterruptedException { writer.close(); } }

CQLSSTableWriter expects to be able to write directly to the local filesystem, but our Map/Reduce job runs on Amazon Elastic Map/Reduce, where local storage is ephemeral. To make our SSTables available beyond the lifecycle of the cluster, we added a step in our workflow to copy the SSTables to a predictable location in Amazon S3.

The SSTable generation process above produces sets of files with names like:

N.mc-1-big-CompressionInfo.db N.mc-1-big-Data.db N.mc-1-big-Index.db

…where N is the number of the reducer, as well as the number of a node in the Cassandra ring. Each set of files produced by the same reducer needs to get copied to the same node in the ring.

To orchestrate this process, we SSH into each node, and download its corresponding set of SSTables from S3. Then we copy the files into the Cassandra data directory for the target column family, stripping off the reducer number prefix, and assigning a new generation number in the process. We calculate the generation number by taking the maximum generation across all table files in the data directory and adding one. So if the data directory has the following SSTable files:

mc-72-big-CompressionInfo.db mc-72-big-Data.db mc-72-big-Digest.db mc-72-big-Filter.db mc-72-big-Index.db mc-72-big-Statistics.db mc-72-big-Summary.db

…the new generation will be 73.

Once the new SSTables are in place, we can make them “live” by running

nodetool refresh "${keyspace}" "${column_family}"

Or, if the cluster isn’t currently running, starting Cassandra will automatically bootstrap the new files.

It’s worth noting that we’ve wrapped this copy-and-publish process into a shell script that we can run on each node in our Cassandra cluster at the end of each Hadoop job.

This architecture is still new for us, so expect to see updates to this post as we learn more about how it performs and scales. So far, we’ve found that it’s been quite easy to publish updates without impacting reads, and to create mirrors of production data. In particular, we’re curious to see how difficult the process of growing or shrinking a Cassandra cluster is under this architecture compared with a more traditional, dynamically-partitioned cluster.

Want to learn more? Want to have the infrastructure described above available as a library? Drop us a line!

--

--