I find it useful to implement concepts from scratch before looking up a proper implementation. This approach has the benefit of helping me appreciate the challenges and design decisions. Looking at the optimal solution has the effect of making a good implementation seem trivial, which almost always isn’t the case.
Not surprisingly, what I come up with in this first attempt is usually far from a well-thought-out and optimized implementation. If you’re an expert in distributed systems and can implement consistent hashing in your sleep, this post has not a lot of new insights to offer you. If you’ve heard of consistent hashing but haven’t really understood it deeply, you may find some value in following me along.
The problem
Conceptually, our task is to allocate an infinite set of items X
to a finite set of resources Y
.
Caching systems as a motivating example
As a concrete example, you may want to develop a caching system that allocates arbitrary URLs (set X
) to a finite set of caching servers Y
. Setting aside the real-world complexities of building a caching system, the high-level goal is to decrease latency by caching results that have already been requested.
Load will eventually increase
As the number of requests increases, there comes a point where serving the cached results from a single server becomes the bottleneck and adding more servers can help with latency.
Allocations need to be consistent
We just need to make sure that requests for the same URL always go the same server, which ensures that the cached results are only fetched and stored on one server.
How do we do this? We need a function that takes a URL and number of available severs as input, and returns a server ID. Here’s the function signature, assuming that the server ID is an integer:
def allocate(url: str, server_count: int) -> int:
...
If we identify servers using a zero-based index, one possible implementation is to allocate URLs as follows:
from typing import Callable
def allocate_modulo(
url: str,
server_count: int,
hash_fn: Callable[[str], int],
) -> int:
return hash_fn(url) % server_count
Just pick a “good” hash function
What’s a good choice for the hash function? At the risk of vastly oversimplifying a fascinating topic, let’s just say that it suffices to use a hash function that uniformly distributes the input space (all possible URLs) to the output space. We’ll use MurmurHash3 here, which maps strings to signed 32-bit integers (i.e. -2³¹ to 2³¹-1):
import functools
import mmh3
SEED = 0
def hash_mmh3(key: str, seed: int = SEED) -> int:
return mmh3.hash(key, seed=seed)
assert hash_mmh3(key='x') == 1050319643
allocate_modulo_mmh3 = functools.partial(
allocate_modulo,
hash_fn=hash_mmh3,
)
Let’s look at a toy example with 6 websites and 3 servers.
urls = [
'wikipedia.org',
'instagram.com',
'twitter.com',
'google.com',
'youtube.com',
'gmail.com',
]
We can see how this allocation would work using the proposed approach by using the following helper function:
import pandas as pd
from typing import List
def allocate_urls(
urls: List[str],
server_count: int,
) -> pd.DataFrame:
return pd.DataFrame(
dict(
url=url,
hash_value=hash_mmh3(url),
server_index=allocate_modulo_mmh3(
url=url,
server_count=server_count,
),
)
for url in urls
)
Applying this to our set of URLs:
df1 = allocate_urls(urls=urls, server_count=3)
df1
Are we distributing the URLs evenly?
We can see that URLs are assigned to all three servers. However, it’s hard to judge whether the allocation is uniform given such a small sample. Instead of using a list of actual URLs, let’s define a function that generates random strings as a stand-in for real-world URLs (we’ll call these pseudo URLs):
import string
import numpy as np
def random_str() -> str:
return "".join(np.random.choice(
# select from lowercase letters
list(string.ascii_lowercase),
# generate a string that's between 6 to 20
# characters long
size=np.random.randint(6, 20),
replace=True,
))
We can now generate 10k random strings and allocate them across 3 servers:
allocate_urls(
urls=[random_str() for _ in range(10_000)],
server_count=3,
).server_index.value_counts(normalize=True)
allocation seems to be close to uniform:
0 0.3354
2 0.3334
1 0.3312
Looks like we have found an approach to successfully allocate URLs uniformly, so what’s the problem? Well, we may want/need to scale up/down the number of servers due to changes in load, failures, or maintenance.
Adding a single server
What happens to the allocations if we just add one server?
df2 = allocate_urls(urls, server_count=4)
df2
You might’ve noticed that most URLs no longer map to the previously allocated servers. This is not great, seems like each time we add/remove a server we’d reallocate most URLs, which requires them to be fetched again and defeats the purpose of caching. This issue might’ve not been very easy to spot in our previous table, so let’s add a helper function to aid with visualization:
def after(
df_before: pd.DataFrame,
df_after: pd.DataFrame,
) -> pd.DataFrame:
df_after = df_before.merge(
df_after,
on='url',
suffixes=['_before', '_after'],
)
idx_before = df_after.server_index_before
idx_after = df_after.server_index_after
return df_after.assign(same=idx_before == idx_after)
applying it to the two Pandas dataframes we created earlier:
df_after = after(df_before=df1, df_after=df2)
df_after
Only 2 / 6 URLs are (consistently) allocated to the same server as before:
assert df_after.same.sum() == 2
Was this just a particularly bad example?
You may object at this point: maybe this was a particularly bad example and we don’t run into this issue in general. We can generate many pseudo URLs and compute the fraction that map to the same servers.
def same_allocation(
servers_before: int,
servers_after: int,
url_count: int=100,
repeat: int=100,
) -> float:
urls_list = [
[random_str() for _ in range(url_count)]
for _ in range(repeat)
]
return np.mean([
after(
df_before=allocate_urls(
urls=urls,
server_count=servers_before,
),
df_after=allocate_urls(
urls=urls,
server_count=servers_after,
),
).same.mean()
for urls in urls_list
])
Applying this to our example of going from 3 to 4 servers, we see that roughly 25% of pseudo URLs have a consistent allocation:
assert np.allclose(
same_allocation(servers_before=3, servers_after=4),
0.25,
atol=1e-2,
)
That didn’t really work… 😢
This is not an accident! In fact, we can compute the probability of consistent allocation as follows:
p(consistent allocation)
= p(1 = 1) + p(2 = 2) + p(3 = 3)
= 3 x (1/3 x 1/4) = 1/4
where p(i=i)
means probability of two random URLs mapping to i
under both mod 3 and 4. E.g. the probability of allocation to 1 under mod 3 is 1/3, and once that happens, the probability of allocation to 1 under mod 4 is 1/4.
In general, going from n
to n+1
servers leads to only 1/(n+1)
consistent allocations on average.
assert np.allclose(
same_allocation(servers_before=4, servers_after=5),
1 / 5,
atol=1e-2,
)
assert np.allclose(
same_allocation(servers_before=5, servers_after=6),
1 / 6,
atol=1e-2,
)
Let’s check this against empirical results:
import matplotlib.pyplot as plt
pd.DataFrame(
dict(
n=n,
empirical=same_allocation(n, n + 1),
theoretical=1 / (n + 1),
)
for n in range(1, 10 + 1)
).set_index('n').plot()
_ = plt.ylabel('Fraction of consistent allocations')
(If you’re curious, even for positive integers n
and m
wheren > m + 1
(i.e. they’re not consecutive) the probability of consistent allocation is still1 / (m + 1)
, unless both n
and m
are even. See if you can figure out the pattern when both n
and m
are even using the function above.)
Can we do better?
More allocation consistency would be much better. How do we accomplish that? Well, that’s where consistent hashing comes in. The goal is to minimize inconsistent allocations as we add or remove servers, while uniformly distributing the load. This means that if we’re adding a server to a system that already has n
servers, we want roughly 1/(n+1)
URLs to move to the new server.
Consistent hashing
What if we make each server responsible for random ranges against the hash output space? Let’s say we have 3 servers. One strategy for getting random ranges is to map each server to a random point in the hash output space, say by hashing the unique ID of the server.
Once we have these points, we can make each server responsible for URLs that land between the server’s hash value A
and up to (but not including) the minimum hash value B
(among all server hashes), where B > A
. For instance, server 1 is responsible for the range between 1 and 2 (exclusive of 2) in this figure:
The conceptual ring
You might’ve noticed that the range between the beginning of the hash output space and server 1 is not covered. This is easy to fix- we can just make server 3 responsible for that range. This approach would effectively be the same as thinking of the entire hash output space as a ring by connecting the two ends:
Is this really a good approach? To answer that question, let’s see what happens when we add a new server:
Note that a part of the range that server 1 was previously responsible for is now owned by server 4. This is great- reallocation is contained to a single server. However, there’s an issue that you might’ve noticed in our example: some servers (e.g. server 3) are responsible for a much larger portion of the space relative to some of the others. We’ll dive deeper into that issue in part 2 of this series. For now, let’s implement this idea.
Implementation
First, let’s define a dataclass
to represent servers:
from dataclasses import dataclass
ServerId = str
@dataclass(frozen=True)
class Server:
server_id: ServerId
seed: int = SEED
@property
@functools.lru_cache()
def hash_value(self) -> int:
return hash_mmh3(key=self.server_id, seed=self.seed)
def __lt__(self, other: "Server") -> bool:
return self.hash_value < other.hash_value
What’s important to note at this point is that each server is identified by a unique ServerId
(which is just a string in our example). The rest may seem a bit too elaborate, but I promise that there’s a point to the additional bells and whistles that we’ll see later (well, mostly in the upcoming parts). So, don’t worry if it’s not all that obvious right now.
Roadmap
We now need a way to capture some state and a few methods to leverage or mutate that state:
- At all points, we need to know the current set of servers that are a part of the ring.
- We need a method to add a new server identified by a
ServerId
, and update #1. Let’s call this methodadd_server
. - We need a method to remove a server given a
ServerId
, which we can callremove_server
. - Finally, we need a method for allocation of a new
key
, which could be a URL, but we’ll use the more general term key. If it helps, you can treat keys and URLs interchangeably for the rest of this post. Let’s call this methodkey_lookup
, which takes akey
(just a string), and returns aServerId
that the key is allocated to given the current state.
Obviously, there’s many ways to go about this, but here’s my high level design:
class ConsistentHashing:
def __init__(self, server_ids: Set[str], seed: int = SEED):
...
def key_lookup(self, key: str) -> ServerId:
...
def add_server(self, server_id: ServerId) -> None:
...
def remove_server(self, server_id: ServerId) -> None:
...
Initialization
My assumption is that it’ll be convenient to initialize an object that captures the initial set of server IDs, and then we can just add, remove, and allocate over the lifetime of the object. So, let’s start with __init__
(personally, I think using a mutable dataclass
and using __post_init__
would’ve been slightly nicer to use here, but I decided not to go down that path in case some readers are not very familiar with the syntax):
class ConsistentHashing:
def __init__(self, server_ids: Set[str], seed: int = SEED):
assert len(server_ids) > 0
self._seed = seed
self._server_ids = server_ids
self._server_allocations = None
self._allocate_servers()
def _allocate_servers(self):
"""
Map servers to the ring given the
current set of server IDs.
"""
self._server_allocations = {
server_id: Server(
server_id=server_id,
seed=self._seed,
)
for server_id in self._server_ids
}
First, note that we aren’t allowing for initialization with no servers (we could have, but this choice makes the implementation slightly easier and makes it very clear to the user that the goal is to allocate keys to servers).
Second, and really the crux of this step, is to store a dictionary that maps ServerId
s to Server
objects. (This may be a good time to take a quick look at the hash_value
method in Server
from earlier). _server_allocations
is going to be the main internal data structure for enabling most of the upcoming functionality.
Key lookup (i.e. allocating URLs/keys to servers)
Let’s see how we can now use this data structure for allocating a new key:
def key_lookup(self, key: str) -> ServerId:
"""
Allocate `key` to a server.
"""
key_hash = hash_mmh3(key=key, seed=self._seed)
# compute the distance between the hash value of
# the input key and each server's hash
distances = (
(key_hash - server_obj.hash_value, server_id)
for server_id, server_obj
in self._server_allocations.items()
)
# find the server with the smallest positive distance
# assign None if no such server exists
# which means that the server with the largest hash is
# responsible for this key
closest = min((
(dist, server_id)
for dist, server_id in distances
if dist >= 0
), default=None)
return (
# get the ID of the server with the largest hash value
max(self._server_allocations.values()).server_id
if closest is None
else closest[1]
)
The algorithm is simple:
- Use the same hash function to compute the key’s hash
key_hash
- Compute the distance between
key_hash
and all server hashes - Find the server that the key maps to
Adding/removing servers
The rest is mostly bookkeeping- when we add or remove servers, we just need to make sure that we update the state.
Putting it all together:
class ConsistentHashing:
def __init__(self, server_ids: Set[str], seed: int = SEED):
assert len(server_ids) > 0
self._seed = seed
self._server_ids = server_ids
self._server_allocations = None
self._allocate_servers()
def key_lookup(self, key: str) -> ServerId:
"""
Allocate `key` to a server.
"""
key_hash = hash_mmh3(key=key, seed=self._seed)
# compute the distance between the hash value of
# the input key and each server's hash
distances = (
(key_hash - server_obj.hash_value, server_id)
for server_id, server_obj
in self._server_allocations.items()
)
# find the server with the smallest positive distance
# assign None if no such server exists
# which means that the server with the largest hash is
# responsible for this key
closest = min((
(dist, server_id)
for dist, server_id in distances
if dist >= 0
), default=None)
return (
# get the ID of the server with the largest hash value
max(self._server_allocations.values()).server_id
if closest is None
else closest[1]
)
def add_server(self, server_id: ServerId) -> None:
if server_id in self._server_allocations:
raise ValueError(
f"server_id={server_id} already exist."
)
self._server_ids.add(server_id)
self._add_server(server_id=server_id)
def remove_server(self, server_id: ServerId) -> None:
if server_id not in self._server_allocations:
raise ValueError(
f"server_id={server_id} does not exist"
)
if len(self._server_allocations) == 1:
raise ValueError(
"only one server left- cannot delete"
)
self._server_ids.remove(server_id)
self._remove_server(server_id=server_id)
def _allocate_servers(self):
"""
Map servers to the ring given the
current set of server IDs.
"""
self._server_allocations = {
server_id: Server(
server_id=server_id,
seed=self._seed,
)
for server_id in self._server_ids
}
def _add_server(self, server_id: ServerId) -> None:
self._server_allocations[server_id] = Server(
server_id=server_id,
seed=self._seed,
)
def _remove_server(self, server_id: ServerId) -> None:
del self._server_allocations[server_id]
Let’s use it
Here’s an example with 3 servers:
ch = ConsistentHashing(server_ids={'1', '2', '3'})
Say we want to allocate two URLs:
url_ex1 = 'google.com'
url_ex2 = 'wikipedia.org'
We see that these URLs are allocated to servers 1 and 3 respectively:
assert ch.key_lookup(url_ex1) == '1'
assert ch.key_lookup(url_ex2) == '3'
The following picture depicts this allocation:
I’ll offer another way of conceptually thinking about consistent hashing that may be helpful. When looking up a key, you can think of placing it on the ring given the hash value. Then, you can traverse the ring counter-clockwise until you hit the first server, and simply return that. In practice, we’ll have to do linear search (or binary search as we’ll see in part 3), but I’ve personally found this description very useful.
What if we add a new server “4”?
ch.add_server('4')
How does the allocation change?
assert ch.key_lookup(url_ex1) == '4'
assert ch.key_lookup(url_ex2) == '3'
We see that google.com
is reallocated to server 4, but wikipedia.org
is still allocated to server 3, as it did before.
The road ahead
We have glossed over many details:
- You might have noticed that the the complexity of
key_lookup
isO(N)
, whereN
is the number of servers. Can we do better? - We alluded to the fact that we can’t really control how much of the hash output space is covered by each server. What if we had servers with higher capacity and wanted to assign more of the space to them?
- We don’t really have an easy way to target specific servers, what if we know a server is overloaded and need to reallocate some of its keys?
- How close did we get to our goal of minimizing the number of reallocations?
We will dig into some of those questions in part 2.
All the code can be found in this notebook.