Distributed Hash Table-Kubernetes-Docker-Python

Sourav Das
14 min readMar 29, 2024

--

A distributed hashmap spans multiple nodes/servers, with keys stored across the system. It stores key value pairs in memory to improve the performance and scalability of applications by reducing the need to retrieve data from slower storage systems, such as databases or file systems.

In this blog we will follow the Master — Slave Architecture, you can check the entire code in github.

Read More on the Hash Ring here

Architecture

The Distributed Hashmap is built with Master-Slave Architecture, where a master node is responsible for maintaining the consistent hash ring. All requests are accepted by this master server and then forwarded to respective data nodes.

Distributed Map API

The users will use these APIs to interact with the entire cluster. There are internal APIs as well which the node itself uses to communicate among themselves.

Master Node

app = Flask(__name__)

class DistributedMap:

def __init__(self, config):
self.config = config
self.ring = ConsistentHashRing()
self.autoscaler = create_autoscaler(self.config.autoscaler_type, self)
self.load_balancer = RingLoadBalancer(self.config.cache_size, self.ring, self.autoscaler)

@app.get("/register-node/")
def register_node(self, node = body.json()):
self.ring.register_new_node(node)

@app.post("/save/<key>")
def put(self, key, value = body.json()):
node = self.load_balancer.resolve_node(key)
node.put(key, value)

@app.get("/retrieve/<key>")
def get(self, key):
node = self.load_balancer.resolve_node(key)
return node.get(key)

@app.get("/contains/<key>")
def has(self, key):
node = self.load_balancer.resolve_node(key)
return node.has(key)

@app.post("/remove/<key>")
def remove(self, key):
node = self.load_balancer.resolve_node(key)
return node.remove(key)

@app.get("/status/")
def status(self):
return self.load_balancer.status()

Data Node

The data node APIs are internal to the cluster, accessed only by the master node or other data nodes to serve requests.

app = Flask(__name__)

class DataNode(object):

def __init__(self, config):
self.config = config
self.data = {}

@app.get("/copy-keys/")
def copy_keys(self, target_node):
# copy keys to the target node
pass

@app.post("/save/<key>")
def put(self, key, value = requests.json()):
self.data[key] = value

@app.get("/retrieve/<key>")
def get(self, key):
return self.data[key]

@app.get("/contains/<key>")
def has(self, key):
return key in self.data

@app.post("/remove/<key>")
def remove(self, key):
return self.data.pop(key)

Using the Distributed Map

Let’s create a test case where it will put some load to the cluster, the cluster starts with 2 node, one master node & one data node. For now the cluster scales if there is more than 10 keys in a node.

> kubectl get pods
NAME READY STATUS RESTARTS AGE
ihs-datanode-0 1/1 Running 1 (35m ago) 5d2h
ihs-master-deploy-5db48697b7-jbk5b 1/1 Running 1 (35m ago) 5d2h

Testing Up Scaling

Lets add 10 key-value pairs to the cluster, and watch how it handles this.

for i in range(20):
# generate random string
key = random_str(3)
# call save api of master node
call_put_api(key, i)
> kubectl get pods
NAME READY STATUS RESTARTS AGE
ihs-datanode-0 1/1 Running 1 (1h ago) 1h
ihs-datanode-1 1/1 Running 0 45s
ihs-master-deploy-686446bf5c-l455v 1/1 Running 0 96s

> masternode logs
Running balance
getting metrics
datanode-0 load : 0.5
datanode-1 load : 0.6
Underloaded servers : []
Overloaded servers : []
Balance completed

After adding 10 keys the cluster, it scaled to 3 nodes to accommodate the load, and also from the masternode logs we can see the load has been balanced between two nodes. Lets continue the test by adding 100 more keys.

> pods with load
NAME LOAD READY STATUS RESTARTS AGE
ihs-datanode-0 0.5 1/1 Running 1 (85m ago) 1h
ihs-datanode-1 0.4 1/1 Running 0 11m
ihs-datanode-10 0.6 1/1 Running 0 2m26s
ihs-datanode-11 0.4 1/1 Running 0 2m6s
ihs-datanode-12 0.6 1/1 Running 0 106s
ihs-datanode-13 0.5 1/1 Running 0 86s
ihs-datanode-14 0.3 1/1 Running 0 66s
ihs-datanode-2 0.7 1/1 Running 0 5m6s
ihs-datanode-3 0.4 1/1 Running 0 4m46s
ihs-datanode-4 0.6 1/1 Running 0 4m26s
ihs-datanode-5 0.5 1/1 Running 0 4m6s
ihs-datanode-6 0.7 1/1 Running 0 3m46s
ihs-datanode-7 0.7 1/1 Running 0 3m26s
ihs-datanode-8 0.6 1/1 Running 0 3m6s
ihs-datanode-9 0.6 1/1 Running 0 2m46s

After adding the data the cluster scales to 15 data nodes, distributing the data across the nodes. I have also added the respective loads on each data nodes. It’s clear how huge data is distributed across the cluster, in realtime each node can contain up to 2–4 Gb of data before they upscale.

Now let’s delete the data to see how it scales down the cluster.

Removing all the Data

Once the data is removed the cluster starts down scaling and start terminating the nodes, below is the snapshot of the last few of the node down scaling. The cluster always keep at least 1 data node active at all point.

> kubectl get pods -w
NAME READY STATUS RESTARTS AGE
ihs-datanode-0 1/1 Running 0 18m
ihs-datanode-1 1/1 Running 0 15m
ihs-datanode-2 1/1 Running 0 15m
ihs-datanode-3 1/1 Running 0 14m
ihs-datanode-4 1/1 Terminating 0 14m
ihs-master-deploy-686446bf5c-k7nww 1/1 Running 0 18m
ihs-datanode-4 0/1 Terminating 0 15m
ihs-datanode-4 0/1 Terminating 0 15m
ihs-datanode-4 0/1 Terminating 0 15m
ihs-datanode-3 1/1 Terminating 0 15m
ihs-datanode-3 0/1 Terminating 0 15m
ihs-datanode-3 0/1 Terminating 0 15m
ihs-datanode-3 0/1 Terminating 0 15m
ihs-datanode-2 1/1 Terminating 0 16m
ihs-datanode-2 0/1 Terminating 0 16m
ihs-datanode-2 0/1 Terminating 0 16m
ihs-datanode-2 0/1 Terminating 0 16m
ihs-datanode-1 1/1 Terminating 0 17m
ihs-datanode-1 0/1 Terminating 0 17m
ihs-datanode-1 0/1 Terminating 0 17m
ihs-datanode-1 0/1 Terminating 0 17m

# finally
> kubectl get pods
NAME READY STATUS RESTARTS AGE
ihs-datanode-0 1/1 Running 0 20m
ihs-master-deploy-686446bf5c-k7nww 1/1 Running 0 20m

Consistent Hash Ring

A self balancing binary search tree is used to implement the hash ring, for the following reasons,

  • Search is O(log n)
  • Adding & Removing Node is O(log n)
  • Finding Siblings is O(log n)

“PyTreeMap” is a python library that implements a self balancing binary search tree, the tree data-structure behaves as follows.

ring = TreeMap()
for i in range(0, 20, 5):
ring.put(i, "datanode-" + str(i))
# [0, 5, 10, 15]
print(ring.key_set())
# 95 is max key in the tree
print(ring.last_key())
# 0 is the min key present in the tree
print(ring.first_key())
# 10 is the next higher key present in the tree
print(ring.higher_key(9))
# 5 is the next lower key present in the tree
print(ring.lower_key(9))
# 5 is the itself or next lower key present in the tree
print(ring.floor_key(9))
# 10 is the itself or next lower key present in the tree
print(ring.floor_key(10))

# /masternode/main/ring.py
class ConsistentHashRing(Generic[T]):

def __init__(self):
self.bst = TreeMap()
self.__nodes_in_ring = {}
self.__free_nodes = {}
self.__blocked_nodes = {}

I have created a wrapper around the Tree Data Structure so that other functionality can be added, more details can be found on github here.

You can further see the blog to understand about the ring here

Load Balancer

We need some algorithm to distribute to all the keys across all the nodes, so that space is used efficiently & when all the space is filled up the LB can trigger up-scaling of the more data nodes.

load balancer full github code

Resolve Node for Key

ring = ConsistentHashRing()

# find the node where the key could be present
def resolve_node(key: str) -> DataNode:
# hash the key to find the nearest node to that hash
key_hash = stable_hash(key)
# find the nearest node lower/equal to the key hash
key_hash = ring.floor_hash(key_hash)
# if no lower nearest node found, then save at last node
if key_hash is None:
key_hash = ring.last_hash()
return ring.get_active_node(key_hash)

key = "ABCD"
node = resolve_node(key)
value = node.get(key)

In the above algorithm demonstrate how a node is found out from the hash of a key and then the node is accessed to get the actual value. This step helps in distributing the load to different node to get the value.

Splitting the data in node

We need to split the data in a node at times to prevent the node from becoming overloaded and crashing. To do so we need to find a key between two node and move half the data into the other node.

Note: In Kubernetes, upscaling/downscaling the number of replicas of statefulset is in LIFO Order.

async def split_node(self, node):
# get the free node with min instance no, so that down scale has the least effect
free_node = ring.get_free_node_with_min_instance_no()
try:
# check if node is reachable
reachable = await free_node.health_check()
if not reachable:
return
# calculate the hash at which the new node should be placed in the ring
mid_hash = await node.calculate_mid_key()
# copy all the keys from the calculated hash in node to the new node
response = await node.copy_keys(free_node, mid_hash)
if response["success"]:
# add the node in the ring if copy is successful
ring.put_active_node(mid_hash, free_node)
# remove the copied keys from the node
await node.compact_keys()
return True
else:
LOGGER.error(f"moving data failed for: {node.name()} -> {free_node.name()}")
except Exception as e:
LOGGER.exception("Exception while adding node, ", e)
return False

When a data node capacity is about to exhaust the master node signals the autoscaler to increase the replica count, after the node comes up the master selects the data node to splits the data. After the data transfer is complete the new node is inserted into the ring to serve request.

Merging a Node

If a node doesn't have enough number of data, then sometimes it is better to free up that node so that the resources can be utilized some where else.

async def merge_node_with_previous(self, node_hash):
# check if is not the first node
if not node_hash == ring.first_hash():
return
# get the behind/previous node
previous_hash = ring.lower_hash(node_hash)
previous_node = ring.get_active_node(previous_hash)
# get the node
node = ring.get_active_node(node_hash)
try:
# move all data to previous/behind node
response = await node.copy_keys(previous_node, node_hash, None)
# remove node from the ring
ring.free_up_node(node)
# compact keys if required
await node.compact_keys()
return True
except Exception as e:
LOGGER.exception("Exception while merging node", e)
return False

The node is merged with the previous node because in a consistent hash ring the keys are served by the nearest lower node.

When should we Split or Merge Nodes ?

We can split & merge node on the following reasons

  • Split -when the node is overloaded
  • Merge - when the node is underloaded
  • If no free node available — Up scale / Increase the no of pods
  • If more free node is there — Down scale
async def balance(self):
# find the overloaded & underloaded Nodes in the ring
overloaded_nodes, underloaded_nodes = await get_node_state()
# upscale if there are more overloaded nodes than free nodes + underloaded node
is_upscale_req = len(overloaded_nodes) - (len(underloaded_nodes) + ring.no_of_free_nodes())
if is_upscale_req > 0:
await up_scale()

awaits = []
# free up underloaded node first
# free up the least loaded nodes first
for node_hash in underloaded_nodes:
awaits.append(merge_node(node_hash))
# wait for all mergers to complete
await asyncio.gather(*awaits)
awaits.clear()
# once the nodes are free, then split the overloaded nodes
# maximum loaded node first
for node_hash in reversed(overloaded_nodes):
awaits.append(split_node(node_hash))
# wait for all the node to be split
await asyncio.gather(*awaits)
# if any free nodes are left, then down scale
await down_scale()

Why async & await is used ?

As all the split and merge calls are API calls on the respective data nodes, we can execute them in parallel and no need to wait & execute sequentially.

The above figure demonstrate how overloaded & underloaded nodes are split & merged in the ring to balance the load overall the cluster.

Autoscaler

In this distributed map, we used the k8s python client to access k8s api from inside the master node pod and change the replica counts for simplicity, the best way would be to use a Horizontal Pod Autoscaling & some custom metric server.

Up Scaling

Whenever the load balancer finds that some node are overloaded & there is not enough free node to split the load then it asks the k8s cluster to assign more replicas to it.

from kubernetes import client, config

config.load_incluster_config()
api = client.AppsV1Api()

# up scale to the given instance no
def upscale(self, instance_no: int):
# read the current replica count
data = self.api.read_namespaced_stateful_set_status("ihs-datanode", "default")
LOGGER.info("k8s status : {}", data.status)
# if instance_no is more than current replica, then upscale
if instance_no > data.status.current_replicas:
body = {'spec': {'replicas': data.status.current_replicas + 1}}
# use the k8s statefulset patch api to update the replica count
self.api.patch_namespaced_stateful_set_scale("ihs-datanode", "default", body)
LOGGER.info("Up scaling: no of nodes: {}", data.status.current_replicas + 1)
else:
LOGGER.info("Up scaling: not required: {}, instance_no {}", data.status.current_replicas, instance_no)
return

Down Scaling

The loadbalancer down scales replicas when more than 1 node is free, it find the latest node and then frees it up if required then down scales it.

from kubernetes import client, config

config.load_incluster_config()
api = client.AppsV1Api()

# down scale to the given instance no
def downscale(self, instance_no: int):
# get replica count from k8s api
data = self.api.read_namespaced_stateful_set_status("ihs-datanode", "default")
# if current replica is more that instance_no then ask k8s to down scale
if data.status.current_replicas > 1 and (data.status.current_replicas - 1) >= instance_no:
body = {'spec': {'replicas': data.status.current_replicas - 1}}
# use patch api to update replica count
self.api.patch_namespaced_stateful_set_scale("ihs-datanode", "default", body)
return

Special Case for Down Scaling

Kubernetes follows a specific startup and termination order for pods to ensure that the application maintains stability and consistency. Pods are started and terminated sequentially, allowing sufficient time for each pod to initialize or gracefully shut down.

So, a problem arises when we want to remove nodes from our Hash Ring, if the node is not the latest node we cannot down scale it. One solution to that problem is as follows

  • Find the latest node in from the RING.
  • If the latest node is in the ring,
  • move the date from the latest node to a freenode.
  • replace latest node with freenode in the ring
  • down scale the latest node
async def free_node_to_down_scale(self):
# if there is a free node then only down scale
if ring.no_of_free_nodes() > 1:
# get latest node from the ring
max_node_from_ring, ring_node_key = ring.get_active_node_with_max_instance_no()
# get latest free node
max_free_node = ring.get_free_node_with_max_instance_no()
# if latest node is in the ring
if max_node_from_ring.instance_no() > max_free_node.instance_no():
# get the oldest free node
free_node = ring.pop_free_node_with_min_instance_no()
try:
# copy data to the oldest node
response = await max_node_from_ring.copy_keys(free_node, ring_node_key, "")
if response["success"]:
# if copy is successful replace free node with latest node
ring.put_active_node(ring_node_key, free_node)
# free up latest node
ring.free_up_node(max_node_from_ring)
return True
except Exception as e:
LOGGER.exception("Failed to remove from ring: {}, {} ", max_node_from_ring.name(), ring_node_key, e)
else:
# if latest node is free node
ring.remove_blocked_or_free_node(max_free_node.instance_no())
# download scale the node
autoscaler.downscale(max_free_node.instance_no())
return False

Building Docker Images

Building the Image

We need to build two images, one for the master node, and one for the data node.

Master Node Dockerfile

FROM python:3
ENV PYTHONUNBUFFERED 1
ADD ./ /home/app/masternode
WORKDIR /home/app/masternode
RUN pip install -r requirements.txt
WORKDIR /home/app
EXPOSE 8080
ENTRYPOINT python masternode/main/server.py
# command to build the image
minikube image build -t ihs-master:latest -f ./Dockerfile .

Data Node Dockerfile

FROM python:3
ENV PYTHONUNBUFFERED 1
ADD ./ /home/app
WORKDIR /home/app
RUN pip install -r requirements.txt
EXPOSE 8080
ENTRYPOINT python main/server.py
minikube image build -t ihs-datanode:latest -f ./Dockerfile .

The images are built inside the minikube cluster and used when the pods are deployed.

Deploying to Kubernetes

Master Node Pod

# master deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ihs-master-deploy
labels:
type: restapi
spec:
selector:
matchLabels:
app: ihs-master
replicas: 1
template:
metadata:
name: ihs-master-tmpl
labels:
app: ihs-master
spec:
containers:
- name: ihs-master
image: ihs-master:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
env:
- name: PORT
value: "8080"
- name: CACHE_SIZE
value: "2"
- name: MASTER_NODE
value: "true"
- name: AUTOSCALER_TYPE
value: "K8S"
readinessProbe:
initialDelaySeconds: 5
periodSeconds: 5
httpGet:
path: /
port: 8080
serviceAccountName: k8s-api-access
---
# master service
apiVersion: v1
kind: Service
metadata:
name: ihs-master-svc
spec:
type: LoadBalancer
selector:
app: ihs-master
ports:
- protocol: "TCP"
port: 8000
targetPort: 8080

Result

> kubectl get pods

NAME READY STATUS RESTARTS AGE
ihs-master-deploy-5db48697b7-jbk5b 1/1 Running 1 (1m ago) 1m

> kubectl get services

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
ihs-master-svc LoadBalancer 10.104.118.134 <pending> 8000:32625/TCP 1m

Data Node Statefulset

# datanode statefulset
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: ihs-datanode
labels:
type: restapi
spec:
serviceName: "ihs-datanode"
selector:
matchLabels:
app: ihs-datanode
replicas: 1
template:
metadata:
name: ihs-datanode-tmpl
labels:
app: ihs-datanode
spec:
containers:
- name: ihs-datanode
image: ihs-datanode:latest
imagePullPolicy: IfNotPresent
resources:
requests:
memory: 32Mi
limits:
memory: 256Mi
ports:
- containerPort: 8080
env:
- name: PORT
value: "8080"
- name: MASTER_NODE_URL
value: "http://ihs-master-svc:8000"
- name: INSTANCE_INDEX
valueFrom:
fieldRef:
fieldPath: metadata.labels['apps.kubernetes.io/pod-index']
- name: APPLICATION_DETAILS
value: >
{
"application_name": "datanode-$(INSTANCE_INDEX)",
"application_uris": [
"http://ihs-datanode-$(INSTANCE_INDEX).ihs-datanode.default.svc.cluster.local:8080"
],
"application_id": "node-$(INSTANCE_INDEX)"
}

---
# data node service
apiVersion: v1
kind: Service
metadata:
name: ihs-datanode
spec:
clusterIP: "None"
selector:
app: ihs-datanode
ports:
- protocol: "TCP"
port: 8080
targetPort: 8080

Result

> kubectl get pods

NAME READY STATUS RESTARTS AGE
ihs-datanode-0 1/1 Running 1 (1m ago) 1m
ihs-master-deploy-5db48697b7-jbk5b 1/1 Running 1 (1m ago) 1m

> kubectl get services

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
ihs-datanode ClusterIP None <none> 8080/TCP 1m
ihs-master-svc LoadBalancer 10.104.118.134 <pending> 8000:32625/TCP 1m

Kuberneters Cluster Role for K8S AutoScaler

A cluster role is required for the master node, so that the autoscaler can signal the k8s cluster to increase or decrease the number of data-node replicas.

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: pods-list
rules:
- apiGroups: ["*"]
resources: ["statefulsets","pods", "statefulsets/status", "statefulsets.apps", "statefulsets/scale"]
verbs: ["get", "patch", "*", "update", "patch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-api-access
namespace: default
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: pods-list
subjects:
- kind: ServiceAccount
name: k8s-api-access
namespace: default
roleRef:
kind: ClusterRole
name: pods-list
apiGroup: rbac.authorization.k8s.io

Once the deployment is done, you can expose the master service URL to your host machine and call the put / get APIs to access the Map. The deployment will auto scale itself depending on the amount of key store in the cluster.

minikube service ihs-master-svc --url

Conclusion

In conclusion, although implementing a distributed map in Kubernetes offers benefits such as scalability, performance, and availability, it also poses challenges such as complexity, data consistency, and network overhead that require careful consideration during design and implementation.

Advantages:

  1. Scalability: Distributed map in Kubernetes allows for horizontal scaling, meaning you can add or remove nodes dynamically to handle varying workloads.

Disadvantages:

  1. Data Partitioning: Distributing data across multiple nodes introduces complexities related to data partitioning and distribution, which can impact cache efficiency and performance.
  2. Network Overhead: Distributed caching systems rely on network communication between nodes, which can introduce network overhead and latency, especially in geographically distributed deployments.
  3. Consistency Trade-offs: Achieving strong consistency guarantees in distributed caching systems often requires trade-offs in terms of performance and availability, as ensuring consistency across distributed nodes can be challenging.

--

--