Design In-memory Streams: Part 2

Tushar Gupta
16 min readMay 14, 2023

--

This is the continuation of the story Design In-memory Streams in which we designed a data streaming service in memory.

Now thanks to the design, it is relatively easy to integrate remote storage so that we can run our service in different processes or on different machines as well. If you haven’t read the previous story, I highly recommend reading it before proceeding.

For this article, we will use Redis as a remote storage. If you’re unfamiliar with Redis, don’t worry — it’s easy to pick up as we go along.

Redis

Redis is an open-source, in-memory data structure store. Redis is fast and efficient because it stores data in memory, and supports persistent storage as well. It can also be used as a message broker by implementing the pub-sub messaging pattern. You can read more about Redis here.

We will explore many Redis components, throughout the article like Redis sets, Redis hash, Redis List, Lua Scripts, Redis pub/sub, etc. Don’t worry if you are not familiar with these components; we will introduce them as we go along.

Note: Redis is single-threaded, but it is designed to be fast because it relies heavily on in-memory data structures and optimized algorithms. By keeping all data in memory, Redis eliminates the need for disk I/O, which can be a significant bottleneck in traditional database systems.

Design:

Now our design is similar to the “in-memory” one, we will just extend a few classes and override some methods to integrate Redis.

Redis Data Structures:

Let’s discuss the most common Redis data structure that we will use in this article:

  • Redis sets: A Redis set is an unordered collection of unique strings (members). It allows us to insert/remove and find elements efficiently. We can also use Redis sets to perform common set operations such as intersection, unions, and differences.
  • Redis hash: Redis hashes are record types structured as collections of field-value pairs. We can use hashes to represent basic objects and store groupings of counters, among other things.
  • Redis List: Redis lists are linked lists of string values. We can use it to store sequential data and can also run ranged queries on the same.

KEYS Structure and Definition

Let’s start by discussing the KEYS we will use to represent different components in our Redis integration:

Topics and partitions: Redis provides a hash and a list data structure, we can utilize to create our topics. We will store the metadata (topic name, number of partitions, and partition strategy) in one hash. Then, we will create lists for each partition of our topics. Redis list allows us to push one or more values at a time and also supports ranged queries.

Keys:

  • {TOPIC_NAME}: stores a hash of topic metadata, with keys,
    - topic_name: Name of topic
    - partitions: number of partitions
    - strategy: data partition strategy to be used by topic
    - strategy_metadata: data to be used by data partition strategy, to the determine next partition
  • {TOPIC_NAME}::partition0 : Redis list of 0th partition of the {topic_name}
  • {TOPIC_NAME}::partition1 : Redis list of 1st partition of the {topic_name} …
  • {TOPIC_NAME}_partition_change: channel to publish the number of partitions changed updates on the topic. Consumer groups consuming from this topic will subscribe to this channel to listen.

Consumer group: We will use Redis sets to store a list of consumer members attached to the consumer group data. Redis sets allows us to add/remove members in O(1) time. Additionally, Redis hash will be used to store the status of topic partitions, such as assigned or unassigned.

Keys:

  • CONSUMER_GROUP::{group_name}: Stores a Redis set of consumer nodes assigned in the group.
  • CONSUMER_GROUP_PARTITIONS_{group_name}: Stores a hash of topic_partitions assigned to the consumer group, with key as topic_partiton_id and value as “assigned” or “unassigned” indicating that particular topic_partiton_id is assigned to any consumer node or not.
  • {GROUP_NAME}::health: Stores if the group coordinator is alive or not. If not alive then consumers will not be able to get or commit the latest offsets.

Consumer members: We will use Redis sets to store the assigned partitions to a consumer member. Additionally, a Redis key with an expiry will be stored to monitor the health of consumer members.

Keys:

  • {CONSUMER_ASSIGNED_PARTITIONS}::{consumer_member_id}: Stores a Redis Set of partitions assigned to consumer member
  • health::{consumer_member_id}: Stores if consumer node is active or not. We will set an expiration on this key. So if this key is not set by the consumer member id every time after a particular time interval, the group coordinator will assume it’s dead/unresponsive and redistribute partitions.

Offsets: We will use the Redis key to store offsets for consumer groups.

Keys:

  • {consumer_group_name}_{topic_name}::{partition_id}: Stores the offset for a given consumer group on a particular topic<>partiton.

Implementation

There are a few classes, we will have to add in order to make Redis work with our current design. (Please Read code comments for better understanding)

Redis Client:

This will be a helper class, which will directly communicate with Redis. This class will be a Singleton and will be used by other classes to communicate with Redis.

We will also use Redis pub/sub and keyspace notifications to listen to specific updates like consumer member added/removed so that we can rebalance. You can read about it here.

import redis
from threading import Lock
import os

REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = 6379


class RedisClient:
"""
Interacts with redis to perform operations.
This will be singleton , since we need only one redis connection
"""

instance = None
lock = Lock()

def __new__(cls):
"""
Make sure only one instance of redis client is created
"""
with cls.lock:
if cls.instance is None:
cls.instance = super(RedisClient, cls).__new__(cls)
return cls.instance

def __init__(self):
# create redis connection
self.redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, charset="utf-8",
decode_responses=True)

# enable key-space notifications
self.redis_client.config_set('notify-keyspace-events', 'KEA')

# start redis pub sub in thread
self.redis_pub_sub = self.redis_client.pubsub()
self.redis_pub_sub.run_in_thread(daemon=True)

def call_lua(self, script_name, keys, args):
"""
Call lua script
:param script_name: name of script in lua scripts folder
:param keys: keys for lua script
:param args: args for lua script
:return:
"""
lua_file_path = f'{os.path.dirname(__file__)}/lua_scripts/{script_name}.lua'
f = open(lua_file_path, "r")
lua = f.read()
operation = self.redis_client.register_script(lua)
op_return = operation(keys=keys, args=args)
return op_return

def set_key(self, key, value, exp=None):
"""
set key in redis
:param key:
:param value:
:param exp:
:return:
"""
return self.redis_client.set(key, value, exp)

def add_to_list(self, list_name, values):
"""
append data to end of list
:param list_name:
:param values:
:return:
"""
return self.redis_client.rpush(list_name, values)

def get_range_list_data(self, list_name, start_index, end_index):
"""
query range data from list
:param list_name:
:param start_index:
:param end_index:
:return:
"""
return self.redis_client.lrange(list_name, start_index, end_index)

def add_to_hash(self, hash_key, field, value, method='hset'):
"""
add key<>value to hash
:param hash_key:
:param field:
:param value:
:param method: hset for overriding/creating keys or hsetnx for creating keys
:return:
"""
if method == 'hset':
self.redis_client.hset(hash_key, field, value)
elif method == 'hsetnx':
self.redis_client.hsetnx(hash_key, field, value)
else:
raise Exception("No such method defined")

def add_multiple_to_hash(self, hash_key, hash_map):
"""
add multiple values to hash
:param hash_key:
:param hash_map:
:return:
"""
self.redis_client.hmset(hash_key, hash_map)

def increment_hash_value(self, hash_key, field, inrby):
"""
increment a key in hash
:param hash_key:
:param field:
:param inrby:
:return:
"""
self.redis_client.hincrby(hash_key, field, inrby)

def add_to_set(self, set_name, values):
"""
add member to a set
:param set_name:
:param values:
:return:
"""
return self.redis_client.sadd(set_name, values)

def publish_to_channel(self, channel, msg):
"""
publish data to redis channel
:param channel:
:param msg:
:return:
"""
self.redis_client.publish(channel, msg)

def get_key(self, key):
"""
Get key from redis
:param key:
:return:
"""
return self.redis_client.get(key)

def get_hash(self, hash_name):
"""
get hash from redis
:param hash_name:
:return:
"""
return self.redis_client.hgetall(hash_name)

def get_hash_key(self, hash_name, key):
"""
get a single key from hash
:param hash_name:
:param key:
:return:
"""
return self.redis_client.hget(hash_name, key)

def get_hash_len(self, hash_name):
"""
get len of keys in hash
:param hash_name:
:return:
"""
return self.redis_client.hlen(hash_name)

def get_set_members(self, key):
"""
Get set members of redis set
:param key:
:return:
"""
return self.redis_client.smembers(key)

def add_sub(self, key_name, function):
"""
add a subscription to redis pub sub
:param key_name: keyname for subcription
:param function: call back function of sub
:return:
"""
print(f"adding sub for key {key_name}")
self.redis_pub_sub.psubscribe(**{f'__keyspace@0__:{key_name}': function})

def add_channel_sub(self, channel_name, function):
"""
add sub to channel
:param channel_name:
:param function: callback function
:return:
"""
self.redis_pub_sub.psubscribe(**{channel_name: function})

def check_key_exists(self, key):
"""
check if key in redis exists or not
:param key:
:retaurn:
"""
return self.redis_client.exists(key) == 1

Topics and Partitions:

Topic: We will create a topic using Redis hash, which will have the fields topic_name, number of partitions, data, and data partition strategy.

We will also store data partition strategy metadata, which will help us to decide the next partition to push data to.

Partitions: We will create partitions using Redis List. Redis list exposes commands to insert and query range data both, so it’s an ideal structure for partition

from broker.broker import Broker
from broker.partition_strategy.base_partition_strategy import TopicDataPartitionStrategy
from broker.topic import Data


class RedisTopicClient(Broker):
from configs.redis_client import RedisClient
redis_client = RedisClient()

@classmethod
def get_topic_partition_key(cls, topic_name, partition_id):
"""
Stores data for a particular topic and partition
:param topic_name:
:param partition_id:
:return:
"""
return f'{topic_name}::{partition_id}'

@classmethod
def get_topic_metadata_key(cls, topic_name):
"""
stores topic metadata , like name,no_of_partitions,partition_strategy etc
:param topic_name:
:return:
"""
return f'{topic_name}'

@classmethod
def get_topic_metadata(cls, topic_name):
return cls.redis_client.get_hash(cls.get_topic_metadata_key(topic_name))

@classmethod
def attach_to_topic(cls, topic_name, consumer_group, **kwargs):
"""
Add partition change sub to consumer group and return the partitions len
:param topic_name:
:param consumer_group:
:param kwargs:
:return:
"""
# add channel sub , in case partitions for this topic changes, we will notify consumer group
cls.redis_client.add_channel_sub(f'{cls.get_topic_metadata_key(topic_name)}_partition_change',
kwargs.get("change_detect_func"))

return cls.get_no_of_topic_partitions(topic_name)

@classmethod
def create_topic(cls, topic_name, partitions, strategy):
"""
Create a topic in redis. Topic metadata like no of partitions, name , strategy all is saved in metadata key
:param topic_name:
:param partitions:
:param strategy:
:return:
"""
cls.redis_client.add_multiple_to_hash(cls.get_topic_metadata_key(topic_name),
{"topic_name": topic_name, "partitions": partitions,
"strategy": strategy})

@classmethod
def get_no_of_topic_partitions(cls, topic_name):
"""
get no of partitions from topic metadata
:param topic_name:
:return:
"""
return int(cls.redis_client.get_hash_key(topic_name, "partitions"))

@classmethod
def add_data_to_topic(cls, topic_name, data, data_class=Data):
"""
add data to topic. here we will use strategy metadata to determine the partition first , in which data needs
to be added and then add the data there and also update strategy metadata.(this is required , as in case of
Round robit data partition strategy we need to update last partition used)
:param topic_name:
:param data: data object
:param data_class: :return: data class to use
"""
# get metadata
metadata = cls.redis_client.get_hash(cls.get_topic_metadata_key(topic_name))

# make partition strategy object and get the next partition
partition_strategy = TopicDataPartitionStrategy.get_partition_strategy_class(metadata.get('strategy')). \
deserialize(metadata.get('strategy_metadata', "{}"))
next_partition = partition_strategy.get_next_partition(int(metadata['partitions']))

# insert data into topic and update metadata
cls.redis_client.call_lua("add_data_to_topic", [cls.get_topic_partition_key(topic_name, next_partition),
cls.get_topic_metadata_key(topic_name),
"strategy_metadata", partition_strategy.serialize()],
[data_class.serialize(data)])

@classmethod
def poll_data(cls, topic_name, partition, limit, offset, data_class=Data):
"""
get range data from Redis list. use data class for deserialize
:param topic_name:
:param partition:
:param limit:
:param offset:
:param data_class:
:return:
"""
return [data_class.deserialize(k) for k in
cls.redis_client.get_range_list_data(cls.get_topic_partition_key(topic_name, partition), limit, limit +
offset - 1)]

@classmethod
def add_partition(cls, topic_name):
"""
add partitions to topic metadata and publish the same to the channel , so that attached groups can rebalance.
:param topic_name:
:return:
"""
print("adding_partition")
cls.redis_client.increment_hash_value(cls.get_topic_metadata_key(topic_name), "partitions", 1)

# publish to channel to notify consumer groups consuming from topic
cls.redis_client.publish_to_channel(f'{cls.get_topic_metadata_key(topic_name)}_partition_change', topic_name)

Redis Consumer Group Keys

This is a helper class that returns KEYS related to the consumer group.

class RedisConsumerGroupKeys:
"""
Provides keys for redis consumer group coordinator
"""

@classmethod
def get_partitions_assigned_to_consumers_key(cls, consumer_member_id):
# assigned topics to the consumers
return f'CONSUMER_ASSIGNED_PARTITIONS::{consumer_member_id}'

@classmethod
def get_consumer_group_key(cls, group_name):
# assigned consumers to the group
return f'CONSUMER_GROUP::{group_name}'

@classmethod
def get_consumer_group_partitions_key(cls, group_name):
# overall partitions in a group
return f'CONSUMER_GROUP_PARTITIONS::{group_name}'

@classmethod
def get_group_coordinator_health_key(cls, group_name):
# overall partitions in a group
return f'{group_name}::health'

Redis Consumer Keys

This is a helper class that returns KEYS related to consumer members.

class RedisConsumerKeys:
"""
Provides redis consumer keys
"""

@classmethod
def get_consumer_health_key(cls, consumer_member_id):
"""
consumer heart beat will be set on this keys
:param consumer_member_id:
:return:
"""
return f'health::{consumer_member_id}'

Redis offset client

Redis offset client is used by the group coordinator to save offsets of the consumer groups for a given topic<>partition in Redis.

from configs.redis_client import RedisClient
from configs.redis_consumer_group_keys import RedisConsumerGroupKeys


class RedisOffsetClient:
"""
Interacts with redis to save offsets
"""

redis_client = RedisClient()

@classmethod
def get_offset_key(cls, consumer_group_name, topic_name , partition_id):
return f'{consumer_group_name}_{topic_name}::{partition_id}'

@classmethod
def commit_offset(cls, consumer_group, topic_name, partition, offset):
# check group leader health
if not cls.redis_client.check_key_exists(RedisConsumerGroupKeys.get_group_coordinator_health_key(group_name=consumer_group)):
print("unable to commit, as group coordinator is down")
return False
cls.redis_client.set_key(cls.get_offset_key(consumer_group, topic_name, partition), offset)
return True

@classmethod
def get_offset(cls, consumer_group, topic_name, partition):
# check group leader health
if not cls.redis_client.check_key_exists(RedisConsumerGroupKeys.get_group_coordinator_health_key(group_name=consumer_group)):
print("unable to get offset as group coordinator is down")
return None

# check if existing offset is there, if not return 0
if not cls.redis_client.check_key_exists(cls.get_offset_key(consumer_group, topic_name, partition)):
return 0
off = cls.redis_client.get_key(cls.get_offset_key(consumer_group, topic_name, partition))
return int(off)

Redis Consumer Member

This class extends ConsumerMember(Please check the previous story for the base implementation of ConsumerMember Class) and we have few methods to listen to Redis pub/sub in order to listen to remove_partition requests from the group coordinator.

class RedisConsumerMember(ConsumerMember):
from configs.redis_client import RedisClient

redis_client = RedisClient()

def __init__(self, consumer_group_name, msg_to_consume=10, max_polling_concurrency=5, handler_class=None):
super(RedisConsumerMember, self).__init__(consumer_group_name, msg_to_consume, max_polling_concurrency,
handler_class)
self.topic_client = RedisTopicClient
self.add_chanel_sub()

def remove_partitions_listener(self, *args, **kwargs):
"""
Partition remove listener, this will be called when consumerCoordinator makes a request to consumer to
revoke partition
:param args:
:param kwargs:
:return:
"""
topic_partition_ids = args[0].get('data').split(',')
self.notify_partition_removed(topic_partition_ids=topic_partition_ids)

def add_chanel_sub(self):
"""
add channel subs
:return:
"""
# add sub for remove partition channel
self.redis_client.add_channel_sub(f'{self.id}_remove_partition', self.remove_partitions_listener)
# add sub for stop consumer channel
self.redis_client.add_channel_sub(f'{self.id}_stop_consumer', self.stop_consumer_gracefully)

Redis Group Coordinator

This class extends GroupCoordinator(Please check the previous story for the base implementation of GroupCoordinator class) and overrides a few methods to get and save data in Redis instead of in-memory.
We will use Redis pub/sub to listen to consumer members changed or topic_partitions changed here. You can read about it here.

class RedisGroupCoordinatorClient(GroupCoordinator):
from configs.redis_client import RedisClient

redis_client = RedisClient()

def __init__(self, name, partition_assignment_strategy='EqualAssignmentStrategy', is_leader=False):
super(RedisGroupCoordinatorClient, self).__init__(name, partition_assignment_strategy, is_leader)
self.offsetClient = RedisOffsetClient
if not self.is_leader:
return
self.redis_client.add_sub(RedisConsumerGroupKeys.get_consumer_group_key(self.name),
self.notify_consumer_members_changed)
# print(self.offsetClient.__name__)
threading.Thread(target=self.group_coordinator_health, daemon=True).start()

def notify_consumer_members_changed(self, *args, **kwargs):
print("consumer_member_changed")
self.rebalance_partitions()

@classmethod
def get_assigned_partitions(cls, consumer_member_id):
"""
Get assigned partitions for a consumer from redis
:param consumer_member_id:
:return:
"""
partitions = cls.redis_client.get_set_members(RedisConsumerGroupKeys.get_partitions_assigned_to_consumers_key(
consumer_member_id))
return partitions

def register_consumer_member(self, consumer_member_id):
"""
Register consumer member to the group
:param consumer_member_id:
:return:
"""
return self.redis_client.add_to_set(RedisConsumerGroupKeys.get_consumer_group_key(self.name),
consumer_member_id)

def notify_partitions_changed(self, topic_name):
"""
Listener in case partitions in topic is changed, to assign new partitions to the nodes
"""
for i in range(RedisTopicClient.get_no_of_topic_partitions(topic_name)):
topic_partition = PartitionTopic(topic_name, i)

# add new partitions in group in unassigned group.
self.redis_client.add_to_hash(RedisConsumerGroupKeys.get_consumer_group_partitions_key(self.name),
topic_partition.__str__(),
'unassigned', method='hsetnx')
self.rebalance_partitions()

def partition_change_detect_func(self, *args):
"""
this function will be called when partitions of any topic on which group is subscribed to changes, so that it can
accommodate new partitions and re-balance
:param args:
:return:
"""
topic_name = args[0].get('data')
print(f"Topic partition changed {topic_name}")
self.notify_partitions_changed(topic_name)

def get_consumer_nodes_in_group(self):
"""
Get consumer nodes in the group
:return:
"""
members = self.redis_client.get_set_members(RedisConsumerGroupKeys.get_consumer_group_key(self.name))
return members

def mark_partitions_unassigned(self, consumer_id, topic_partition_ids):
"""
Remove multiple partitions from consumer and mark them unassigned
:param consumer_id:
:param topic_partition_ids:
:return:
"""
print(f"removing partitions from {consumer_id} {topic_partition_ids}")
self.redis_client.call_lua('remove_x_partitions_from_consumer',
keys=[RedisConsumerGroupKeys.get_partitions_assigned_to_consumers_key(consumer_id),
RedisConsumerGroupKeys.get_consumer_group_partitions_key(self.name)],
args=topic_partition_ids)

return True

def get_unassigned_partitions(self):
"""
Get unassigned partitions of group
:return:
"""
all_partitions = self.redis_client.get_hash(RedisConsumerGroupKeys.get_consumer_group_partitions_key(self.name))
return [k for k, v in all_partitions.items() if v == 'unassigned']

def check_for_unassigned_partitions(self):
"""
Keep checking for unassigned partitions , and trigger a re-balance if found
:return:
"""
while True:
unassigned = self.get_unassigned_partitions()
if unassigned:
print("found unassigned partitions...")
self.rebalance_partitions()
time.sleep(5)

def add_partition_mapping(self, consumer_member_id, topic_partition):
"""
add partition mapping for a topic_partition to a consumer and mark it assigned
:param consumer_member_id:
:param topic_partition:
:return:
"""
return self.redis_client.call_lua("add_partition_mapping",
[RedisConsumerGroupKeys.get_consumer_group_partitions_key(self.name),
RedisConsumerGroupKeys.get_partitions_assigned_to_consumers_key(
consumer_member_id),
topic_partition], [])

def notify_remove_partition(self, consumer_id, partition_ids):
"""
Notify consumer to remove partition. It publishes a request on {consumer_id}_remove_partition with the
partitions to remove Consumer listening on that channel will process the request. :param consumer_id: :param
partition_ids: :return:
"""
self.redis_client.publish_to_channel(f"{consumer_id}_remove_partition", ','.join(partition_ids))

def ping_heart_beat(self, consumer_id):
"""
update heart beat of consumer
:param consumer_id:
:return:
"""
return self.redis_client.set_key(RedisConsumerKeys.get_consumer_health_key(consumer_id), "alive", 10)

def partitions_len_in_group(self):
"""
Get no of partitions in the group
:return:
"""
return self.redis_client.get_hash_len(RedisConsumerGroupKeys.get_consumer_group_partitions_key(self.name))

def attach_to_topic(self, topic_name):
for k in range(RedisTopicClient.attach_to_topic(topic_name, self.name, **{"change_detect_func":
self.partition_change_detect_func})):
topic_partition = PartitionTopic(topic_name, k)
self.redis_client.add_to_hash(topic_partition.__str__(), PartitionStatus.UNASSIGNED.value, 'hsetnx')

self.rebalance_partitions()

def notify_consumer_member_removed(self, consumer_member_id, send_stop_sig=False):
"""
Remove consumer member from the group. ie mark all it's assigned partitions unassigned and remove from consumer
members list
:param consumer_member_id:
:param send_stop_sig:
:return:
"""
print(f"removing the consumer --- {consumer_member_id}")
return self.redis_client.call_lua("mark_partitions_unassigned_of_consumer",
[RedisConsumerGroupKeys.get_partitions_assigned_to_consumers_key(
consumer_member_id),
RedisConsumerGroupKeys.get_consumer_group_partitions_key(self.name),
RedisConsumerGroupKeys.get_consumer_group_key(self.name),
consumer_member_id
], [])

def check_consumer_health(self):
"""
Monitors the state of consumers
"""
while True:
for cid in list(
self.redis_client.get_set_members(RedisConsumerGroupKeys.get_consumer_group_key(self.name))):
if not self.redis_client.check_key_exists(RedisConsumerKeys.get_consumer_health_key(cid)):
with self.rebalance_lock:
print("consumer not alive , removing from group")
self.notify_consumer_member_removed(cid)
time.sleep(2)

def group_coordinator_health(self):
while True:
self.redis_client.set_key(RedisConsumerGroupKeys.get_group_coordinator_health_key(self.name), "alive", 10)

Lua scripts:

Lua scripts are small programs that can be executed inside Redis server. Redis uses the Lua scripting language to provide an easy and flexible way to manipulate data stored in Redis.

With Lua scripts, you can execute multiple commands in a single transaction. This is useful when you need to execute multiple commands atomically, such as when updating multiple keys or checking the existence of multiple keys before performing an operation. You can read more about Lua here.

We will create a few Lua scripts to perform different operations in Redis:

  • Add to topic:
-- This script adds data to a topic partition and update strategy_metadata as well
-- KEYS : [{topic_name}::{partition_id}, {topic_name}, strategy_metadata, strategy_metadata_value]
-- ARGS : [topic data]
redis.call('RPUSH',KEYS[1],ARGV[1])
redis.call('HSET', KEYS[2], KEYS[3], KEYS[4])
  • Add Partition mapping to consumer
-- This script assigns partition to a consumer and also mark it assigned in group partitions
-- KEYS: [CONSUMER_ASSIGNED_PARTITIONS::{consumer_member_id},CONSUMER_GROUP_PARTITIONS::{group_name},topic_partition_id]
local partition = KEYS[3]
redis.call('SADD', KEYS[2], partition)
redis.call('HSET', KEYS[1], partition, 'assigned')
  • Remove consumer from group
-- This script removes all partition_ids from a consumer, mark them unassigned and remove consumer member from group
-- KEYS : [CONSUMER_ASSIGNED_PARTITIONS::{consumer_member_id}, CONSUMER_GROUP_PARTITIONS::{group_name}, CONSUMER_GROUP::{group_name}, consumer_member_id]
local myTable={}
local index=1
local popped = redis.call('SPOP', KEYS[1], redis.call('SCARD', KEYS[1]))
for k, value in ipairs(popped) do
myTable[index] = value
index = index+1
myTable[index] = 'unassigned'
index = index+1
end
redis.call('HSET', KEYS[2], unpack(myTable)) redis.call('SREM', KEYS[3], KEYS[4])
  • Remove “X” partitions from the consumer
-- This script removes x number of partitions from consumer and mark them unassigned
-- KEYS : [CONSUMER_ASSIGNED_PARTITIONS::{consumer_member_id}, CONSUMER_GROUP_PARTITIONS::{group_name}]
-- ARGS : [partition_ids to remove]
local members = ARGV
redis.call('SREM', KEYS[1], unpack(members))
local lua_table={}
local index=1
for ix,value in pairs(members) do
lua_table[index] = value
index = index+1
lua_table[index] = 'unassigned'
index = index+1
end
redis.call('HSET', KEYS[2], unpack(lua_table))
return 1

Start Scripts:

To run the GroupCoordinator, Consumer Members, and BrokerClient, we will use start scripts as an entry point. These scripts will take command-line arguments such as group_name and msg_to_consume as configurations.

  • GroupCoordinator: This script starts the GroupCoordinator for a particular group. (NOTE: only one group coordinator should be there for a single group).
import os
import signal
import sys
import time

# add to python path , so that it can find module

sys.path.append('/'.join(os.path.dirname(__file__).split("/")[:-1]))
from group_coordinator.group_coordinator import RedisGroupCoordinatorClient


class RunGroupCoordinator:

def __init__(self):
self.is_active = True
self.group_coordinator_config = self.get_config()
self.group_coordinator_ref = RedisGroupCoordinatorClient(**self.group_coordinator_config,
is_leader=True)
signal.signal(signal.SIGINT, self.signal_handler)

@classmethod
def get_config(cls):
redis_consumer_member_config = {}
for k in sys.argv[1:]:
key = k.split("=")[0]
val = k.split("=")[1]
redis_consumer_member_config[key] = val
return redis_consumer_member_config

def signal_handler(self, signum, frame):
print("stopping")
time.sleep(2)
self.is_active = False
exit(0)

def start_group_coordinator(self):
while self.is_active:
x = str(input("action\n"))
if x == "attach_to_topic":
topic_name = str(input('topic_name : '))
self.group_coordinator_ref.attach_to_topic(topic_name)
print("Attached to topic")


RunGroupCoordinator().start_group_coordinator()
  • Consumer Member: This script starts a consumer member with the defined config(we will pass consumer_group_name, msg_to_consume, and max_polling_concurrency as command line args).
import os
import sys
import signal
import time

# add to python path , so that it can find module
sys.path.append('/'.join(os.path.dirname(__file__).split("/")[:-1]))

from consumer.consumer_member import RedisConsumerMember
from group_coordinator.group_coordinator import RedisGroupCoordinatorClient


class RunConsumer:

def __init__(self):
# handler on SIGINT to handle stopping consumer
signal.signal(signal.SIGINT, self.signal_handler)
self.is_active = True
self.consumer_ref, self.group_coordinator_ref = None, None
self.consumer_member_config = self.get_config()
self.consumer_group_name = self.consumer_member_config['consumer_group_name']
if not self.consumer_group_name:
print("no consumer group defined")
exit(1)

@classmethod
def get_config(cls):
redis_consumer_member_config = {}
for k in sys.argv[1:]:
key = k.split("=")[0]
val = k.split("=")[1]
redis_consumer_member_config[key] = val
return redis_consumer_member_config

def signal_handler(self, signum, frame):
"""
whenever SIGINT is received ie. on ctrl+c , we will stop the consumer
:param signum:
:param frame:
:return:
"""
self.consumer_ref.stop_consumer_gracefully()
time.sleep(2) # buffer time for all threads to end
self.is_active = False

def start_consumer(self):
# start consumer in a blocking main thread
self.group_coordinator_ref = RedisGroupCoordinatorClient(self.consumer_group_name)
self.consumer_ref = RedisConsumerMember(**self.consumer_member_config)
print("starting consumer")
while self.is_active:
pass
exit(0)


RunConsumer().start_consumer()
  • BrokerClient: This script starts a broker client and exposes an interface to create_topic or add data to the topic in Redis.
import os
import sys

# add to python path , so that it can find module
sys.path.append('/'.join(os.path.dirname(__file__).split("/")[:-1]))
from broker.redis_topic import RedisTopicClient, Data


class RunBroker:

@classmethod
def start_broker_client(cls):
while True:
x = str(input('Select action \n'))
if x == 'create_topic':
topic_name = str(input('topic_name : '))
topic_partitions = int(input('number of partitions : '))
strategy = str(input('strategy : '))
RedisTopicClient.create_topic(topic_name, topic_partitions, strategy)
print("Created Topic \n")
elif x == 'push_data_in_topic':
topic_name = str(input('topic_name : '))
data = str(input('data : '))
RedisTopicClient.add_data_to_topic(topic_name, Data(data, None))
print("Inserted Data to topic \n")
elif x == 'exit':
exit(0)


RunBroker.start_broker_client()

Docker Files:

To put it simply, Docker is a platform that allows developers to create and run applications in containers. These containers are self-contained environments that hold all the necessary dependencies to run the application. This makes it easier to build, package, and share applications without worrying about compatibility issues. You can read more about docker here.

We will be using Docker to create images for GroupCoordinator, Consumer members, and Broker Client. This means we can build the image once and run multiple containers on the same image. For example, we can run the container for consumers in different processes or on different machines with a remote connection to Redis.

  • GroupCoordinator Docker file
FROM python:3
COPY . .
RUN pip3 install -r requirements.txt
ENTRYPOINT ["python3", "start_scripts/start_group_coordinator.py"]
  • ConsumerMember Docker file
FROM python:3
COPY . .
RUN pip3 install -r requirements.txt
ENTRYPOINT ["python3", "start_scripts/start_consumer.py"]
  • BrokerClient Docker file
FROM python:3
COPY . .
RUN pip3 install -r requirements.txt
ENTRYPOINT ["python3", "start_scripts/start_broker.py"]

Testing:

Let’s build and run our docker images;

  • Redis: We can start Redis locally using Docker.
docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest
  • GroupCoordinator:
docker build -t group_leader_image -f docker/GroupCoordinatorDockerFile .
docker run -it -e REDIS_HOST=host.docker.internal group_leader_image consumer_group_name=new_group
  • ConsumerMember:
docker build -t consumer_image -f docker/ConsumerDockerFile .
docker run -e PYTHONUNBUFFERED=1 -e REDIS_HOST=host.docker.internal consumer_image consumer_group_name=new_group
  • BrokerClient:
docker build -t broker_client_image -f docker/BrokerClientDockerFile .
docker run -it -e REDIS_HOST=host.docker.internal broker_client_image

Executing Broker Client:

As you can see we can see an interface to create a topic or add data to a topic. We will create a topic name “new_topic” and insert some data in it.

Executing Group Coordinator:

This is the group coordinator, and we have an interface here to attach to the topic. We will attach it to the “new_topic” we created above.

Rebalance happens after we run consumer member, so all unassigned partitions will be assigned now. We will see the distribution according to the partition assignment strategy if we run multiple consumer members.

Executing Consumer Member:

As soon as we run a consumer member, it will get assigned partitions from the group coordinator, and it will consume the data we pushed in the “new_topic”

We can check the keys in Redis as well:

Conclusion:

That’s it. There might be some edge cases that I missed or was too lazy to fix. Feel free to point out and fix the same.

I hope this has been a good learning experience. You can find the complete code here.

Feel free to drop any questions, comments, or suggestions and if you enjoyed the article please support and follow for more.

--

--