Consistent Hashing: Beyond the basics

Inspired by AWS DynamoDB

Omar Elgabry
OmarElgabry's Blog
13 min readNov 24, 2020

--

The classic consistent hashing algorithm addresses the issues with the modular hashing algorithm, where the hash function (position of key K) is tied to the number of the storage units, requiring a redistribution of all the keys on scale-ups and downs.

# modular hashing
hash = key % N of nodes

With consistent hashing, on the other hand, the hashing function is independent of the number of storage nodes. This allows us to dynamically partition the data as we add or remove nodes, and hence, scale incrementally.

The hash space is kept huge and constant. It is often referred to as the “ring”. Each storage node is assigned a random position within this hash space, ring.

Each data item is assigned to a node by hashing the data item’s key to get its position on the ring, and then walking the ring clockwise to find the first node with a position larger than the item’s position.

Thus, each node becomes responsible for the region in the ring between it and its predecessor node. And therefore, adding or removing nodes only requires redistribution of the keys that fall in its region, and not all the keys as with the modular hashing.

Modular vs Consistent hashing

And that is the end of it. Well, not really …

Consistent hashing presents some challenges. The most obvious one is around the random assignment of node positions on the ring leading to non-uniform data and load distribution. Moreover, when a node is added or removed, some data has to be copied by retrieving a set of keys from individual nodes which is usually inefficient and slow.

Inspired by AWS DynamoDB, this article discusses the challenges presented with the classic consistent hashing, touching on different scaling aspects such as availability, consistency, performance, and durability. Moreover, it also discusses data versioning and reconciliation, nodes membership, and failure detection and handling. This, however, does not attempt to explain how DynamoDB works; It is rather a summary of generic ideas and personal notes referencing the AWS DynamoDB published paper. DynamoDB is Amazon’s highly available key-value store.

Mapping Node to T Tokens

T Tokens per node

To solve the non-uniform data and load distribution with the classic consistent hashing, we can map each node to T positions in the ring, called “tokens”. When a new node is added, it gets assigned randomly scattered T positions, tokens, on the ring. This has the following benefits:

  • When a node is added, it has a roughly equivalent amount of load compared to the existing available nodes.
  • When a node is removed, or becomes unavailable due to failures, the load handled by this node is distributed back in a reverse process, effectively evenly distributing the load across the remaining available nodes.

Availability

— Replication

Replicating data across multiple nodes is crucial in achieving high availability. Each data item is replicated at N nodes. The node coordinates the write requests is in charge of replicating the data items that fall within its range across the N-1 clockwise successor positions (tokens) in the ring. It is possible that the first successor positions are determined by another hash function for randomness, and also across different data centers for higher availability.

Some useful definitions:
N
is the number of replicas for each data item, a common value is 3; R is the number of replicas to ack / reply to a read request; W is thhe number of replicas to ack / presist a write request; S is the number of nodes in the system; T is the number of tokens (positions) for a physical node on the ring; A key range is a set of keys on the ring that are associated with a token (and a token is owned by a node).

— Always writable

We all know that in the distributed systems world, where network failures and data conflicts are not uncommon, high consistency and data availability cannot be achieved simultaneously.

Traditional algorithms trades-off the availability of the data under failure scenarios, and so the data is made unavailable until it is absolutely certain that it is correct.

Instead, we can increase availability by dealing with the uncertainty of the correctness of an answer, allowing changes to propagate to replicas in the background, and concurrent, disconnected work is tolerated.

The challenge with this approach is that it can lead to conflicting changes that must be detected and resolved. This process of conflict resolution introduces two problems: when to resolve them and who resolves them.

Consistency

— Conflict Resolution: When

Whether conflicts should be resolved during reads or writes. Many traditional data stores resolve conflicts during writes and keep the read complexity simple. That comes at the cost of rejecting writes if the data store cannot reach W nodes, effectively reducing system availability.

Pushing the conflict resolution to the reads ensures that writes are never rejected; Reads won’t be rejected when data version conflict is detected; It is now a question of who and how will be resolved?.

— Conflict Resolution: Who

This can be done by the data store itself or the application client.

The application client is aware of the data schema and the business logic, and therefore, it can decide on the conflict resolution. Using the shopping cart example, when a customer wants to add an item to (or remove from) a shopping cart and the latest version is not available, the item is added to (or removed from) the older version, and the divergent versions are reconciled later. That change is still meaningful and should be preserved. On reads, when conflict versions are detected, the application client can choose to “merge” the conflicting versions and return a single unified shopping cart.

On the other hand, if the datastore is handling the conflicts, its choices are rather limited and can only use simple policies, such as timestamp-based reconciliation logic of “last write wins” (i.e. the item with the most recent timestamp value is chosen as the correct version). The service that maintains customer’s session information is a good example of this use case.

— Conflict Resolution: How

Versions Conflict Reconciliation

Data versioning allows us to detect, resolve conflicting versions and hence, ensure data consistency. Each update performed by a node is treated as a new and immutable version of the data. A version consists of (node, counter) pairs i.e. [(N, c), …], where N is the node coordinated the write request.

Most of the time, new versions subsume the previous version and the data store itself can determine the authoritative version.

However, in the presence of failures combined with concurrent updates, version (parallel) branching may happen resulting in conflicting versions of an item. In this case, the multiple branches of data are collapsed into one. A typical example explained earlier is “merging” different versions of a customer’s shopping cart by the application client.

To illustrate the read and write workflows:

Reads

  1. The node coordinating the read request will request the existing versions of an item given its key from all N nodes.
  2. It will then wait for R nodes out of the N nodes to reply.
  3. Return the result. If version conflict is detected, specifically parallel branches that cannot be reconciled by the data store, it will return the conflicting items with their version context info to the client. The client performs an update after reconciling the divergent versions by collapsing the branches into one version.

Writes

  1. The node coordinating the write request will store it locally, generate a new version, and replicate it across N-1 positions (tokens). The client must specify which version it is updating. This is done by passing the context it obtained from an earlier read operation.
  2. Once W nodes out of N respond, the write request is deemed successful.

Performance

Providing consistently high performance for read and write operations is challenging as the performance is limited by the slowest of the R or W replicas.

Some applications require a high level of performance and can trade-off durability for performance (i.e. R=1, W=1). For that, each storage node maintains an object buffer in its main memory. Each write operation is stored in the buffer and gets periodically written to the persistent storage. Each read operation first checks if the requested key is present in the buffer. If so, the object is read from the buffer instead of the storage engine.

This will result in lowering latency by a high factor but trades durability; A server crash can result in missing writes that were queued up in the buffer.

Durability

To reduce the durability risk, the coordinator node handling the write request chooses one out of the N-1 replicas to write the data to its persistent storage. And since the coordinator node waits only for W responses, the performance of the write operation is not affected.

Generally, increasing the number of nodes that need to confirm on successful write operation increases durability but that also trades availability; Write requests might get rejected if no enough alive nodes to reply.

It is important that data items are replicated across data centers to survive failures that happen due to power outages, network failures, and natural disasters.

Nodes Membership

A gossip-based protocol among the nodes propagates membership changes (a node joins or leaves the ring) and maintains an eventually consistent view of membership.

Each node contacts a peer chosen at random every second and the two nodes efficiently reconcile their persisted membership change histories.

Therefore, each storage node is aware of the tokens handled by its peers. This allows each node to forward a key’s read/write operations to the right set of nodes directly.

This obviates the need to maintain a centralized globally consistent view of the failure state.

Failure Detection & Handling

Failure detection and handling avoid failed attempts to read or write, which would have reduced availability and durability even under the simplest of failure conditions.

— Temporary failures: hinted handoff

Node A may consider node B temporarily down if node B does not respond to node A’s messages. Node A then uses alternate nodes to perform the request; Node A periodically retries B to check for the latter’s recovery.

In the absence of client requests to drive traffic between two nodes, neither node really needs to know whether the other is reachable and responsive.

To handle this, a data item that would normally have lived on B will now be sent to node X (a temporary replica of B). The data item sent to replica X will have a hint in its metadata that suggests which node was the intended recipient of the replica (in this case B).

Nodes that receive hinted data items will keep them in a separate local database that is scanned periodically. Upon detecting that B has recovered, X will attempt to deliver the data to B. Once the transfer succeeds, X may delete the data item from its local store.

Using hinted handoff, we ensure that the write operations are not failed due to temporary node or network failures.

— Long-lasting failures: Replica sync

Node B, under certain conditions such as node outages, might become unavailable for long periods of time, and the hinted data items might not even return to it.

To handle this, a replica synchronization protocol that uses Merkle tree is used to keep the replicas synchronized and detect the inconsistencies.

Merkle tree traversal

A Merkle tree is a hash tree where leaves are hashes of the values of individual keys. Parent nodes higher in the tree are hashes of their respective children. The principal advantage of Merkle tree is that each branch of the tree can be checked independently and in parallel. Moreover, Merkle trees help in reducing the amount of data that needs to be transferred and reduce the number of disk reads performed. Two nodes can traverse their Merkle trees, for the key ranges that they host in common to determine if they have any differences.

Each node maintains a separate Merkle tree for each key range (the set of keys associated with a token) it hosts. This allows nodes to compare whether the keys within a key range are up-to-date.

Recap

So, to summarize the workflows for adding and removing nodes, and reads and writes:

Adding Node

  • When a node X joins, it chooses its set of random tokens on the ring.
  • For every key range that is assigned to node X, there may be a number of nodes that are currently in charge of handling keys that fall within its token range. Due to the allocation of key ranges to X, existing nodes no longer have to store these keys these nodes transfer those keys to X.
  • Reconciles the membership change histories and maintains an eventually consistent view of membership via gossip-based protocol discussed above.

Removing Node

  • When a node is removed, or becomes unavailable due to failures, this signifies a permanent departure, and hence, membership changes get propagated to notify other nodes about node removal.
  • For the key ranges handled by the removed node, they are randomly distributed to the remaining nodes and therefore, evenly distributing the load across the remaining available nodes.

Reads

  1. Client requests are uniformly assigned to nodes in the ring by a load balancer. Any node can act as a coordinator for a read request.
  2. The node coordinating the read request will send the request to all N nodes of key K, and wait for R out of N nodes.
  3. Gather the data, determine if reconciliation is required (as discussed before). This process is called “read repair” because it repairs replicas that have missed a recent update and relieve the replica sync protocol from having to do it.

Writes

  1. Unlike reads, write requests are coordinated by one of the node replicas of the data item with key K. If the node that received the request is not a node replica for that data item’s key, it will forward the request to one of the N replicas of this given key K. This restriction is due to the fact that these preferred nodes have the added responsibility of creating a new version stamp. If the versioning scheme is based on physical timestamps, any node can coordinate a write request.
  2. Wait for W nodes out of N to respond.

The application client library can be a partition-aware client library that routes requests directly to the appropriate coordinator nodes. In this case, we can achieve lower latency because it skips the extra network hop that is incurred if the request were assigned to a random node by the load balancer.

Challenges

Because the tokens are chosen randomly on the ring, the ranges vary in size. And as nodes join and leave the system, the token set changes, and consequently the ranges change. Therefore, the “Mapping Node to T Tokens” strategy presents the following challenges:

  • When a new node joins the system, the nodes handling the data in the key ranges off to the new node have to scan their local persistence store to retrieve the appropriate set of data items. Scan operations are highly resourced intensive and they need to be executed in the background without affecting the customer performance. Moreover, running the bootstrapping task runs at the lowest priority significantly slows the bootstrapping process and becomes cumbersome during busy hours.
  • When a node joins or leaves the system, the key ranges handled by some other nodes change, and the Merkle trees for the new ranges need to be recalculated, which is a non-trivial operation to perform on a production system.
  • Finally, there is no easy way to take a snapshot of the entire keyspace due to the randomness in key ranges, and this makes the process of archival complicated. In this case, archiving the entire keyspace requires to retrieve the keys from each node separately, which is highly inefficient.

The fundamental issue with this strategy is that data partitioning (partition by tokens) and data placement (storage nodes) are dependent; It is not possible to add or remove nodes without affecting data partitioning.

Mapping Node to T Tokens & Equal sized Q partitions

T Tokens per node & Equal sized Q partitions

In this strategy, the hash space is divided into Q equally sized fixed partitions (or key ranges) and each node is assigned T random tokens. The tokens here do not decide the partitioning, and therefore two consecutive tokens don’t define a key range or partition (they only define nodes positions on the ring).

With this strategy, we get the following benefits:

  • Decoupling of partitioning and partition placement. The data item is mapped to one of the Q partitions given its key K, while the node that is responsible for storing that data item is chosen by walking the ring clockwise from the end of the partition that contains key K to find the first token along the way, and then find its storage node.
  • Enabling the possibility of changing the placement scheme at runtime. Adding or removing a node doesn’t change the partition (key range) of any data item.

Q is usually set such that Q > S*T, where S is the number of nodes in the system.

However, the randomness of assigned T tokens, and therefore key ranges, are still a dilemma; Nodes handing data off scan their local persistence store leading to slower bootstrapping; recalculation of Merkle trees; and no easy way of taking snapshots.

Mapping Node to Q/S Tokens & Equal sized Q partitions

Q/S Tokens per node & Equal sized Q partitions

Similar to strategy 2, this strategy divides the hash space into Q equally sized partitions, and data placement is decoupled from data partitioning.

Moreover, each node is assigned Q/S tokens (i.e. T = Q/S). The number of tokens changes as we add or remove nodes.

When a node leaves the system, its tokens are randomly distributed to the remaining nodes such that these properties are preserved. Similarly, when a node joins the system it steals tokens from nodes in the system in a way that preserves these properties.

This strategy has the following benefits:

  1. Faster bootstrapping and recovery. Since partition key ranges are fixed, they can be stored in separate files, and therefore, a partition can be relocated as a unit by simply transferring the file, avoiding random accesses needed to locate specific items.
  2. Ease of archival: The partition files can be archived separately. By contrast, to previous strategies, the tokens are chosen randomly, and archiving the data stored requires retrieving the keys from individual nodes separately and is usually inefficient and slow.

The disadvantage of this strategy is that adding or removing nodes requires us to preserve its properties (i.e. T = Q/S).

Thank you for reading!

Feel free to reach out on LinkedIn or Medium.

--

--

Omar Elgabry
OmarElgabry's Blog

Software Engineer. Going to the moon 🌑. When I die, turn my blog into a story. @https://www.linkedin.com/in/omarelgabry