Cassandra Driver Configuration for improved performance and load balancing

Ravi Varsha
Glassdoor Engineering Blog
14 min readNov 20, 2020
Photo by Christian Englmeier on Unsplash

At Glassdoor we use Cassandra to store various datasets needed by search, machine learning models and other systems. Since most of these systems are built on Java, we use Java drivers for Cassandra, for read, write and delete operations. In this article, we will go over some of the typical, and some unique Cassandra driver configurations we employ in its services for optimal performance.

Search Cassandra Infrastructure

Let’s take one of the important clusters for example

Cassandra Datacenter Overview of Hydra Cluster

In this cluster, Hydra, there are three Cassandra datacenters:

  1. Live: For serving read queries from user facing apps, requiring fastest retrieval
  2. Batch: For serving read queries from internal batch processes that may send requests in bursts. This datacenter might also receive some write requests in form of SSTable data files streamed to it via SSTableLoader that updates the tables.
  3. Offline: For serving predominantly write queries but some read queries which are latency tolerant.

Data written to one dc are replicated to others by Cassandra. Our personalization cluster is created with the three replica in each data center:

CREATE KEYSPACE personalization WITH replication = {‘class’: ‘NetworkTopologyStrategy’, ‘batch’: ‘3’, ‘live’: ‘3’, ‘offline’: ‘3’} AND durable_writes = true;

Data modeling

Typically, we isolate write-heavy queries from read-heavy ones at all stages including data modeling. Writes are exceptionally cheap in Cassandra, while it’s hard to tune reads. For writing we also use SSTableLoader where possible to bulk load external data for faster import.

For organic traffic that we receive, which is sort of evenly spread out through the day, it is important that requests like fetching user preferences from Cassandra tables while ranking search results for a new search should have the least delay and such data should be highly available.

Search also receives queries which are bursty in nature that occur at regular intervals, for example search query using personalization data to target selected audience for job alert emails. Although these may tolerate a slightly higher delay, the systems should be able to handle the load and traffic of abrupt bursts request pattern.

Internal to Glassdoor queries that typically update the system for reconciliation or data analysis may tolerate even higher delay, and less availability, but might have different needs, for example, require higher consistency at the time of querying. Symbolically, the live, batch and offline data center segregation is based on these factors, respectively.

Hence, the schema for tables in these keyspaces are designed with following two primary goals:

  1. Fewer partitions to read the data from.
  2. Data being distributed at the same time but evenly distributed throughout the cluster.

Both are conflicting properties that are balanced by selecting the right partition key designed to consistently hashed throughout the cluster. With this, as we will see later on how drivers make use of this, once the partitioning is complete, we can accurately pinpoint the node which contains the partition key we are looking for, minimizing the time spent searching around especially when we have 1000 keys to search for at once and want to parallelize.

Each datacenter can be independently scaled to meet the performance, availability and consistency requirements once they have been isolated from each type of traffic we discussed before.

In the following sections, we are going to discuss how to use the existing Java drivers to build a performant service on top of Cassandra, how to extend default configurations for these drivers, do efficient retry and load balancing. Current Cassandra version at the time of writing at Glassdoor Search is 3.11.

Datastax Java Driver for Cassandra

We use the Datastax non-enterprise version of the driver. The “cassandra-driver-core” dependency is the main dependency needed to perform basic operations.

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.8</version>
</dependency>

Datastax Java Driver Dependency in Maven

With our version of Cassandra, we need to use Datastax driver version 3.0+.

(There are tons of drivers for Cassandra as it’s an open source database solution. Here’s a list:

http://cassandra.apache.org/doc/latest/getting_started/drivers.html)

Spring Data for Cassandra

I would highly recommend Spring Data modules and especially ones for Cassandra if your ecosystem aligns with Spring. Spring Data for Cassandra is a wrapper around the Datastax Java drivers, and provides sophisticated support for reactive libraries and blends seamlessly with micro-service architecture.

Among most of the features of Spring Data, the key call out would be:

  1. By default it eliminates almost all the boilerplate code needed to connect to and get started with Cassandra. However, it doesn’t limit us from using any of the low-level driver configurations available from core drivers.
  2. The repository pattern style of data access is super easy to configure. Defining queries is simple. Example: deriving the query from the method name. Integration of custom repository code for data-access is easy too.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
<version>2.0.0.M7</version>
</dependency>

Spring-Data-Cassandra (Reactive) Dependency

Notice the reactive version of the library.

Reactive

Given that the backend service is RESTful with heavy IO, if you were to wait for one call to complete before sending the next request, the poor client would give up in frustration before you managed to assemble a reply. So external service calls, especially complex orchestrations of dependencies between calls, are a good thing to optimize. Functional-Reactive programming libraries offer the promise of asynchronous and reactive calls together with “composability” of the logic driving those operations. We have an opportune moment to make use of them heavily in our applications.

A simplistic “No Java Config” data access requires almost no code to set up access for the cluster with spring-data.

application.yml:

cassandra:
personalization:
cluster-name: personalization
contact-points: 172.X.X.X, 172.X.X.Y
keyspace-name: personalization

Cassandra configuration with spring-data in a spring-boot app

Following is an example of the repository to access user_preferences table via user_id.

@Repository
public interface UserPreferencesRepository extends
ReactiveCassandraRepository<UserPreferences, UserId> {
@Query(value = “SELECT * FROM user_preferences where user_id = ?0 limit ?1”)
Flux<UserPreferences> findByUserId(int userId, int limit);
}
Repository interface for accessing user_preferences tableReactiveCassandraRepository is a Cassandra specific CRUD repository with reactive support.The above query fetches record for only one user_id. What would we do if we had a list of them?A simple solution to it, would be to accept a list of userIds in the argument and use an IN query operator:@Query(value = “SELECT * FROM user_preferences where user_id IN ?0 limit ?1”)
Flux<UserPreferences> findByUserId(List<Integer> userIds, int limit);

Repository method to find users for multiple ids with IN operator

However, using an IN operator on the partition key is a query anti-pattern in Cassandra. The reasons are simple as Cassandra and their drivers are optimized to work in distributed nature and query the nodes directly as much as possible which holds the partition. Coordinator also makes use of a similar hashing algorithm to relay requests to appropriate nodes. With IN queries, the coordinator has no clue but to send ids to all nodes and wait for a response from all of them. This not only queries unnecessary nodes, but also introduces a single point of failure. With a large number of ids in IN query, one would soon see slower performance due to heap pressure building too quickly (because coordinator is holding the ids and waiting for response from all nodes) and long GC pauses.

IN operator in the query where clause could degrade performance heavily.

One optimization we do to speed up the queries and reduce pressure on coordinators, is to send individual queries to nodes directly and in parallel (and asynchronously, of course). This avoids coordinator being the point of failure and retries are much simpler and faster.

@Component
public class UserPreferencesRepository {
//Elastic, boundedElastic() and parallel() are some scheduler pool options
private static final Scheduler scheduler = Schedulers.newElastic(“userPreferencesScheduler”, 1);
private String READ_QUERY = “SELECT * FROM %s WHERE user_id = :user_id limit :limit”;
private PreparedStatement preparedStatement;

//Cassandra template that manages all cassandra operations for querying and translating results.
@Autowired
private ReactiveCassandraTemplate reactiveCassandraTemplate;
public UserPreferencesRepository(@Qualifier(“personalizationLiveSession”) final CassandraSessionFactoryBean session) {
preparedStatement = session.prepare(String.format(READ_QUERY, “user_preferences”)); //Prepare query
}
/* Returns a Flux that could be subscribed to, to retrieve multiple UserPreferences records */
Flux<UserPreferences> findByUserId(List<Integer> userIds, int limit) {
List<Flux<UserPreferences>> readStreams = new ArrayList<>();
for (long userId : userIds) {
Statement readStatement = preparedStatement.bind().set(“user_id”, userId, Long.class)
.set(“limit”, limit, Integer.class) //Bind values
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); // Set consistency level
readStreams.add(reactiveCassandraTemplate.select(readStatement, entity));
}
return Flux
.merge(readStreams) //Combine all streams
.name(“USER_PREFERENCES”) // Tag the reactive stream
.metrics() // Enable metrics for monitoring
.parallel() // Open parallel flux by dividing data on rails comparable to your cpu cores
.runOn(readScheduler) // Schedule on a separate pool
.sequential(); // Merge and convert to regular flux
}
}

Custom repository code to avoid IN operator

Once the repository query is defined in either way, call to get-user-data would be something like:

userPreferencesRepository.findByUserId(userIds, limit);

Data access invocation

Extending Configuration and Capabilities

It may be necessary to extend basic default configuration and programmatically drive the requests and handle the session. We extend AbstractCassandraConfiguration to allow custom overriding. These overrides are taken into account at the time of building an internal “cluster” object by the driver. Once the cluster is built it cannot be changed without rebuilding the whole configuration.

@Configuration
@EnableCassandraRepositories
public class CassandraConfig extends AbstractCassandraConfiguration {
@Override
protected SchemaAction getSchemaAction() {
return SchemaAction.CREATE_IF_NOT_EXISTS;
}
}

Extending Cassandra Configuration

Remember we mentioned our infrastructure on data centers: live, batch and offline. However, Cassandra has a different overview of data centers, at least in the way the driver support is provided. If you see one of the load balancing strategies, this may become clearer:

String localDc = “batch” //or live, or offline
final DCAwareRoundRobinPolicy dcAwareRoundRobinPolicy = DCAwareRoundRobinPolicy
.builder()
.withLocalDc(localDc)
.withUsedHostsPerRemoteDc(1)
.build();

Configuring the datacenter to direct requests to while building the cluster.

The driver accepts a “local-dc” and number of remote dcs to use as fallback. This basically allows only one data center as the primary data center among the ones configured for the keyspace, as far as CRUD operations are concerned. The other data center(s) are considered ‘remote’ from it although they may exist on the same AZ, and will be used only for fallback.

This doesn’t help us completely. What we want is the driver to programmatically send requests to one or more data centers based on certain parameters in the request. It means that the same service should be able to connect to both data centers and route traffic to them ‘independently’, and without one type of operation interfering with the other. We would still need the ability to fallback on a different data center, if the DC we wish to use as primary is overloaded or unavailable.

Traffic isolation, session isolation and fallback:

To support this, we deviate from the cluster concept slightly differently. Instead of having one cluster object for the entire keyspace, we create multiple of them — one for each data center. A cluster is closely connected to session. It is important to also separate the session, as otherwise the threads shared by a single session potentially interferes with requests from different data centers. Further, we may even create different cluster and session objects, one for read, one for write and one for delete operation within a data center. Example, a bulk delete slowing down reads because they share the same session and connection pool could be avoided completely.

The read, write and delete requests isolation will become a great improvement on performance when slowness and number of connections used by one kind of operation — especially during bulk requests — interferes with the other type.

The following configuration highlights the cluster creation in such a way

@Configuration
@EnableReactiveCassandraRepositories(basePackages = “com.glassdoor.personalization.live”,
reactiveCassandraTemplateRef = “personalizationLiveCassandraTemplate”)
public class PersonalizationLiveConfig extends AbstractCassandraConfiguration {
/** Cluster with contact points and primary datacenter 'live' configured */
@Bean(“personalizationLiveCluster”)
public CassandraClusterFactoryBean personalizationLiveCluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(…);
cluster.setClusterName(…);
cluster.setLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder()
.withLocalDc(“live”)
.withUsedHostsPerRemoteDc(2)
.build());
return cluster;
}
/** Factory to create and configure a Cassandra Session with support for executing CQL and initializing the database schema (a.k.a. keyspace). */
@Bean(“personalizationLiveSession”)
public CassandraSessionFactoryBean personalizationLiveSession(CassandraConverter cassandraConverter) {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setConverter(cassandraConverter);
session.setCluster(personalizationLiveCluster().getObject());
session.setKeyspaceName(getKeyspaceName());
session.setSchemaAction(SchemaAction.CREATE_IF_NOT_EXISTS);
return session;
}
/** Primary implementation of Cassandra operations, that executes core Cassandra workflow for the 'live' session */
@Bean(“personalizationLiveCassandraTemplate”)
public ReactiveCassandraTemplate personalizationLiveCassandraTemplate(
@Qualifier(“personalizationLiveSession”) final CassandraSessionFactoryBean session) {
final DefaultBridgedReactiveSession reactiveSession = new DefaultBridgedReactiveSession(
session.getObject());
return new ReactiveCassandraTemplate(reactiveSession, cassandraConverter());
}
}

Personalization Live Cluster

The above java configuration created a “cluster” object dedicated for ‘live’ data center of personalization keyspace. The reactive cassandra template reference and base-package location is sufficient now to create repositories for querying them.

Similarly, we would have dedicated configuration for offline:

@Configuration
@EnableReactiveCassandraRepositories(basePackages = “com.glassdoor.personalization.offline”,
reactiveCassandraTemplateRef = “personalizationOfflineCassandraTemplate”)
public class PersonalizationOfflineConfig extends AbstractCassandraConfiguration {
@Bean(“personalizationOfflineCluster”)
public CassandraClusterFactoryBean personalizationOfflineCluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(…);
cluster.setClusterName(…);
cluster.setLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder()
.withLocalDc(“offline”)
.withUsedHostsPerRemoteDc(0)
.build());
return cluster;
}

Personalization Offline Cluster

Notice the ‘usedHostsPerRemoteDc’ being 0 for offline, and 2 for live. This enables that even if the live cluster was heavily loaded, it can fallback on a remote dc (which is offline for this keyspace), and use up to 2 nodes in the remote dc to query without overloading it. 2 is chosen keeping the consistency level of LOCAL_QUROM in most of our use cases for reads, and 3 replicas in each datacenter. We will shortly talk about retry policy with reduced consistency level.

With this we have effectively achieved control of requests over each data center as if it were separate ‘clusters’, and have the ability to route traffic to each independently, while still being able to fallback on other data centers if necessary. We can effectively decide where writes and reads should go just by selecting the appropriate CassandraTemplate or directly the Repository associated with it.

Apart from the above, there are many other configurations to support our traffic pattern. Some of the many options available from the drivers are given below, all of which can be configured at the time of creation of the cluster.

Load balancing

We use a DC-aware, token-aware and latency-aware round-robin load-balancing policy.

DCAwareRoundRobinPolicy: determines which nodes belong to local datacenter and remote dc. The driver then sends requests only to the local dc and can use the remote dc as a fallback.

LatencyAwarePolicy: This policy collects the latencies of queries to each host, and will exclude the worst-performing hosts from query plans. The LatencyAwarePolicy is added on top of DCAwareRoundRobinPolicy.

This is further wrapped with TokenAwarePolicy: finds the partition key for the request and hashes it with the same algorithm as used in cluster. Then it sends the request to a node responsible for the token (chosen randomly among the replica for that partition).

By default, note that the fallback to remote dc is disabled! Hence, forcing all requests to only the first datacenter in the list (considered as primary) with no fallback to other data centers whatsoever. For fallback on a remote DC, explore/override and more cautiously with the “allowRemoteDCsForLocalConsistencyLevel()” option in the builder.

protected LoadBalancingPolicy loadBalancingPolicy(List<String> datacenters,
int usedHostsPerRemoteDc) {
if (CollectionUtils.isEmpty(datacenters)) {
return DCAwareRoundRobinPolicy.builder().build(); //Default
}
String primaryDatacenter = datacenters.get(0);
final DCAwareRoundRobinPolicy dcAwareRoundRobinPolicy = DCAwareRoundRobinPolicy
.builder() // DC-aware Round-robin lb.
.withLocalDc(
primaryDatacenter) // Forces to try all the local dc nodes before attempting remote dc
.withUsedHostsPerRemoteDc(
usedHostsPerRemoteDc) // Override it based on offline/online dc and consistency level desired
.build();
final LatencyAwarePolicy latencyAwarePolicy = LatencyAwarePolicy
.builder(dcAwareRoundRobinPolicy) // Latency-aware
.withExclusionThreshold(3.0) // hosts that are 3-times slower will be excluded
.withScale(100,
TimeUnit.MILLISECONDS) // how quickly the score given to older latencies decreases over time
.withRetryPeriod(10, TimeUnit.SECONDS) // slow host penalization period
.withUpdateRate(100,
TimeUnit.MILLISECONDS) // how often the minimum average latency is recomputed for all hosts
.withMininumMeasurements(
50) // No. of datapoints to collect before penalizing slow host (to prevent skewing the calculation)
.build();
return new TokenAwarePolicy(latencyAwarePolicy, ReplicaOrdering.NEUTRAL);
}

Load balancing strategies

Retry Policy

There are multiple ways to retry on a failed query. At search we use a consistency down graded retry attempts in hope to return at least result from one node. This is to be helpful in a case where availability of response is important sacrificing consistency.

The following policy example attempts to downgrade the consistency level to LOCAL_ONE on read timeout and retry configured number of times.

So note this will not honor the originally desired consistency level of the query. A different approach could be to gradually downgrade one level and keep retries until the limit reaches, but this example doesn’t do that.

public class DowngradedCLRetryPolicy implements RetryPolicy {

private static final ConsistencyLevel DEFAULT_DOWNGRADED_CL = ConsistencyLevel.LOCAL_ONE;

@Override
public RetryDecision onReadTimeout(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
int receivedResponses, boolean dataReceived, int readRetryCount) {
if (dataReceived) {
return RetryDecision.ignore();
} else if (readRetryCount < readAttempts) {
return RetryDecision.retry(DEFAULT_DOWNGRADED_CL);
} else {
return RetryDecision.rethrow();
}
}
}

Retry policies

Speculative Executions

At times when a node is experiencing higher latencies (due to heavy load or long GC pauses), the following policy helps to pre-emptively start a second execution of the query against another node, before the first node has replied or error out. It uses the first response from the queries, and cancels (discards!) the result from other queries. The goal of these speculative executions is to improve overall latency at high percentiles.

In the example below, we use PercentileSpeculativeExecutionPolicy to determine speculative executions based on required latency percentile. The latency tracker attached will be used to store latencies over a sliding time window interval and will be used by the policy to determine which hosts are consistently slower than others.

protected SpeculativeExecutionPolicy speculativeExecutionPolicy() {
PercentileTracker tracker = ClusterWidePercentileTracker
.builder(HIGHEST_EXPECTED_LATENCY)
.build(); // this will collect statistics of each host in cluster
PercentileSpeculativeExecutionPolicy percentileSpeculativeExecutionPolicy =
new PercentileSpeculativeExecutionPolicy(
tracker,
99.0, // percentile that a request’s latency must fall into to be considered slow
2); // maximum number of speculative executions
return percentileSpeculativeExecutionPolicy;}

Speculative executions to counteract slow queries

NOTE of caution: Under heavy load, this feature may worsen the situation with additional requests if not properly monitored and disabling speculative execution may require bouncing of instances or recreating cluster objects with the mechanism disabled.

(Monitoring is also required to understand the percentage of requests falling into this category. [Eg: cluster.getMetrics().getErrors().getSpeculativeExecutions()]. Understand the flipside of more individual requests & throughput, and the tcp stream exhaustion issues if this happens frequently due to the large number of such speculative exhaustion: https://docs.datastax.com/en/developer/java-driver/3.6/manual/speculative_execution/#stream-id-exhaustion)

Query Options

Options related to defaults for individual queries, especially the consistency level.

protected QueryOptions queryOptions() {
return new QueryOptions()
.setMetadataEnabled(true)
.setDefaultIdempotence(true)
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}

Query options and consistency level defaults

Cassandra health monitoring

Either with reactive or non-reactive operations, it is easier to implement a custom health indicator. We use a simple query on the required tables in our keyspaces.

@Component
public class CassandraHealthIndicator extends AbstractHealthIndicator { private final CassandraOperations cassandraOp;

@Autowired public CassandraHealthIndicator(CassandraOperations cassandraOp){
this.cassandraOp = cassandraOp;
}
@Override protected void doHealthCheck(Builder builder) throws Exception {
try {
this.cassandraOp.execute("SELECT now() FROM personalization.user_preferences LIMIT 1");
builder.up().build();
} catch (DataAccessException ex) {
builder.down(ex).build();
}
}
}

Health check basics

Conclusion

Eklutna Lake, AK. Captured by Ravi Varsha

We have seen how to create and work with custom configuration for interacting with Cassandra to support robust greater performance, availability and delegating load. We also talked about Spring Data support, and need for customizing at various levels to support business needs without risking bottlenecks. We have seen an anti-pattern that could result in common use cases and suggested a way to avoid it. We briefly touched on data center management options which is a key concern in many platforms as being able to control directing requests to appropriate data centers is very essential to manage traffic and balance load on data centers.

The options on load balancing, retry policies and speculative executions make for a smarter client that could help service scale even under extreme load.

Some stats: One of our Cassandra cluster is grouped as a ring of ten nodes or less for each data center — live, batch, offline per the example we reviewed here with each instance of type i3.xlarge and data load (size) of 1/3 to 1/2 the total volume on the instance storage of~1TB . Offline and live data centers receive anywhere between 10K to 20K QPS through the day for this use case, with latency being 1ms - 3ms and less than a millisecond, respectively. Batch data center receives a peak of around 50K QPS, with latency under 2–3 milliseconds with consistency level of quorum or higher. With local-one consistency they are around a millisecond with still room for higher load. Writes per second are much smaller than reads; their latency averages around 300–500 microseconds.

--

--