Design In-memory Streams

Tushar Gupta
21 min readMay 14, 2023

--

Data streaming has become the backbone of scalable systems as it helps process large volumes of real-time data which is further used in analytics or event-driven systems.

Today we will work on designing an in-memory data streaming service taking the motivation from Kafka. I believe not only this will be helpful for interviews, but it will also help us improve as developers.

Further, in this article, we will integrate remote storage into this design so that we can run consumers in different processes or machines.

Kafka Brief and terminologies:

Although our design will not be completely based on Kafka, it shares some similarities. So starting with the basic terminologies:

  • Topics: Kafka groups similar types of data together in topics. Producers publish data to topics, and consumers consume data from topics. For example, to capture and analyze user behavior on a website, you can create a topic named “user_behavior” and publish data to it. Other analytics services can consume data from this topic for further processing.
  • Partitions: The topic stores data in partitions. Partitions are a way to horizontally split data within a topic. Each partition represents an ordered, immutable sequence of records. Data within a partition is processed in order, so partitions are the basic units of parallelism in Kafka. Increasing the number of partitions can increase parallel data consumption from a topic, provided that the data is equally distributed among them.
  • Brokers: Brokers are responsible for storing and replicating the topic partitions, handling consumer requests for fetching messages, and serving producer requests for writing messages. Brokers act as intermediaries between producers and consumers, with producers publishing data to topics via brokers and consumers consuming data from topics via brokers.
  • Producers: Producers are clients responsible for pushing data to topics.
  • Consumers: Consumers retrieve data from topics and process it. Each consumer can be assigned to multiple topic partitions, but a single topic partition cannot be assigned to multiple consumers to maintain data ordering guarantees within a partition.
  • Consumer Groups: A consumer group is a collection of consumers working together to consume data from topics in parallel. For example, suppose a consumer group has five consumers and is attached to a topic “X” that has five partitions. In that case, each consumer can be assigned one topic partition so that all consumers can process data in parallel.
  • Offsets: In Kafka, offsets are used to get the required data from a given topic partition, since we always need the next unprocessed data. Offsets are maintained per group for all topic partition IDs, and consumers use them to get the next set of unprocessed data. Once data is processed, the updated offset is committed. For example, if a consumer group “CG” has four consumers attached to the topic “X” with four partitions, and one consumer dies, its partition will be assigned to one of the remaining alive consumers. The consumer will then use the offset to retrieve the next set of unprocessed data for that partition, as the previous consumer might have processed some data, and we don’t want to process it again.

Although this basic understanding of Kafka components is enough, to follow through the rest of the article, if you want any further understanding refer to official documentation.

Problem Statement:

  1. The user can create multiple topics and should be able to provide the number of partitions and data partitioning strategies as well.
  2. The user can choose from any underline storage implementation for each topic and can define custom serialization/deserialization for the data.
  3. The user can publish data to topics, and the defined partitioning strategy will be used to distribute data among partitions.
  4. The user can create multiple consumer groups and should be able to provide name and partition assignment strategies on the same
  5. The user can create multiple consumers and should be able to provide the consumer group they need to be attached to.
  6. Consumer groups can be attached to topics to consume data.
  7. Multiple consumers should be able to consume data from topics parallelly.
  8. If multiple partitions are assigned to a consumer, it should be able to process data parallelly from all of them.
  9. Once a consumer is added or removed from the group, partitions need to be rebalanced accordingly.
  10. If a consumer dies, Partitions need to be rebalanced accordingly among available consumer basis the defined partition assignment strategy
  11. If partitions of any topics are changed, the group should be able to rebalance and accommodate the new partitions.
  12. The group needs to maintain offsets of topic partitions consumption, and if any partitions are rebalanced, It needs to be ensured that any data is not consumed again.
  13. Any partition cannot be assigned to two consumers at one time.
  14. Retry capabilities need to be added if data processing fails and the order of data consumption needs to be maintained in any given partition.

From an interview perspective, the requirements might seem too much and it is, but they will fit together nicely, so just stay with me.

Let’s start by decoupling the requirements and define them using individual components, with each component being responsible for a few features:

  • SerialStorage: This class provides wrappers over different storage data structures, such as arrays or linked lists, that the Topic can utilize. It exposes APIs for pushing and polling data from these structures.
  • Topic: The Topic is responsible for maintaining Partitions (instances of SerialStorage) and using the defined partitioning strategy to push data into the relevant Partitions. It is also responsible for returning ranged data from any Partition.
  • Topic Partition ID: This references the partition of a topic. For example, T1<>P1 references the first partition of the topic T1.
  • Broker: The Broker stores topics/partitions and acts as a client for consumers to poll data and producers to push data.
  • Group Coordinator: Each Consumer Group has one Group Coordinator, which manages the group membership and partition assignments for all Consumers in the group. It is responsible for maintaining Consumer nodes and assigned/unassigned partitions of all topics it is attached to. The Group Coordinator exposes APIs for adding/removing Consumer nodes from the group and balances partitions between all connected Consumer nodes. The Group Coordinator also monitors the health of the Consumers and triggers rebalancing if any Consumer becomes unavailable or unresponsive. It communicates with Consumers via the GroupCoordinatorClient. There will always be only one Group Coordinator for a Consumer Group.
  • GroupCoordinatorClient: This class acts as an intermediary between Consumer nodes and the Group Coordinator. All communications between them are done via this class.
  • ConsumerMember: The Consumer is responsible for pulling data from the Broker for a given topic<>partition and processing it. It uses the OffsetClient to get the offset for which data needs to be processed and commit the updated offset after processing it.
  • OffsetClient: This class is responsible for maintaining offsets of a given Consumer Group for all Topic Partition IDs.

Class Diagram

Here is the class diagram, which gives an overview of our design. Refer to the implementation and code comments below to understand certain fields and classes.

Implementation:

Diving the implementation into different components. (Please Read code comments for better understanding)

Storage Classes

Storage classes will be wrappers over data structures, which will help maintain data in serial order.

As you can see from the requirements, we will always append data to a topic partition and not delete/update any data. Also, we need to keep in mind, that there will be ranged queries for a provided limit and offset.

  • Data Class: Used to serialize and deserialize data to be pushed in storage classes. We can define our own Data classes by extending Base Data
  • Serial Storage: Used to store serialized data objects, and provide APIs to add/query the same. We can define our own storage classes by extending SerialStorage like ArrayStorage or Linked List storage classes.
import json
import threading


class BaseData:
"""
Base class of data objects. Defines serialize/deserialize methods for data
"""

def __init__(self, ttl=None):
self.ttl = ttl

@classmethod
def serialize(cls, data, encoding_scheme='utf-8'):
raise Exception("No serialize method defined")

@classmethod
def deserialize(cls, data: bytes):
raise Exception("No deserialize method defined")


class Data(BaseData):

def __init__(self, msg, ttl):
super(Data, self).__init__(ttl)
self.msg = msg

@classmethod
def serialize(cls, data, encoding_scheme='utf-8') -> bytes:
"""
Serialize data into bytes
:param data:
:param encoding_scheme: encoding scheme of data
:return:
"""
return json.dumps({"msg": data.msg, "ttl": data.ttl}).encode(encoding_scheme)

@classmethod
def deserialize(cls, data, encoding_scheme='utf-8'):
"""
deserialize data and return the data object from bytes/string
:param data:
:param encoding_scheme:
:return:
"""
if isinstance(data, bytes):
data = data.decode(encoding_scheme)
data = json.loads(data)
return cls(msg=data['msg'], ttl=data['ttl'])


class SerialStorage:
"""
Interface for serial storage
"""

def __init__(self, data_class=Data):
self.data_class = data_class

def add_data(self, data, ttl=None):
pass

def get_range_data(self, limit, offset):
pass


class LLNode:

def __init__(self, data):
self.data = data
self.next = None


class ArrayStorage(SerialStorage):

def __init__(self, data_class=Data):
super(ArrayStorage, self).__init__(data_class)
self.data = []
self.lock = threading.Lock() # to make serial storage th

def add_data(self, data, ttl=None):
with self.lock:
self.data.append(self.data_class.serialize(data))

def get_range_data(self, limit, offset):
return [self.data_class.deserialize(_data) for _data in self.data[limit:limit + offset]]


class LLStorage(SerialStorage):

def __init__(self, data_class=Data):
super(LLStorage, self).__init__(data_class)
self.manage_data = {} # additional map to maintain nodes on any given offset , so that range query can be fast
self.head = None
self.tail = None
self.data_len = 0 # maintains current length of LL
self.lock = threading.Lock() # to make serial storage thread safe

def add_data(self, data, ttl=None):
with self.lock:
curr_node = LLNode(self.data_class.serialize(data))
if not self.head:
self.head = self.tail = curr_node
else:
self.tail.next = curr_node
self.tail = curr_node
self.manage_data[self.data_len] = curr_node # add to map as well for fast access of nodes
self.data_len += 1

def get_range_data(self, limit, offset):
"""
returns data between provided limits
:param limit:
:param offset:
:return:
"""
data = []
curr = self.manage_data.get(limit) # uses map to go to node faster instead of traversing
if not curr:
return data
for i in range(limit, limit + offset):
if not curr:
# if no further data available , stop
break
data.append(self.data_class.deserialize(curr.data))
curr = curr.next
return data

Topics and Data Partition Strategies:

  • Topic: Topics will be defined using the name, number of partitions, data partition strategy, and storage class. The topic will insert data into the relevant partition, and we can query the same using partition_id, limit, and offset.
  • Data Partition Strategy: It exposes a function to provide the next partition in which the topic should insert data. We can make any custom partition strategy by extending TopicDataPartitionStrategy, like the Round Robin partition strategy.
class TopicDataPartitionStrategy:

def __init__(self, **kwargs):
pass

def get_next_partition(self, no_of_partitions):
pass

@classmethod
def get_partition_strategy_class(cls, class_name):
if class_name == 'RRPartitionStrategy':
return RRPartitionStrategy

def serialize(self):
raise Exception("No Default implementation")

@classmethod
def deserialize(cls, obj_str):
raise Exception("No Default implementation")


class RRPartitionStrategy(TopicDataPartitionStrategy):
"""
Round robin partition strategy: Uses each partition equally
"""

def __init__(self, **kwargs):
self.last_used = -1
super(RRPartitionStrategy, self).__init__()
self.last_used = int(kwargs.get('last_used', -1))

def get_next_partition(self, no_of_partitions):
to_use = (self.last_used + 1) % no_of_partitions
self.last_used = to_use
return to_use

def serialize(self):
return json.dumps({"last_used": self.last_used})

@classmethod
def deserialize(cls, obj_str):
return RRPartitionStrategy(**json.loads(obj_str))

class Topic:

def __init__(self, topic_name, partitions, strategy, storage_type=LLStorage, storage_metadata=None):
"""
Stores data in partitions
:param topic_name: Name of the topic
:param partitions: No of partitions to use
:param strategy: Partitioning strategy to use
:param storage_type: Serial Storage Class to use
"""
self.topic_name = topic_name
self.partitions = [storage_type() for _ in range(partitions)]
self.partition_strategy = strategy
self.storage_type = storage_type

def push_data(self, data, ttl=None):
"""
Get next partition from partition strategy and push the data
:param data:
:param ttl:
:return:
"""
next_partition = self.partition_strategy.get_next_partition(len(self.partitions))
print("pushing in partition", next_partition)
self.partitions[next_partition].add_data(data, ttl)
return

def poll_data(self, partition, limit, offset):
return self.partitions[partition].get_range_data(limit, offset)

def add_partition(self):
self.partitions.append(self.storage_type())

Broker Class

  • Broker: It is responsible for managing topics and exposing APIs to create and manipulate them. It also handles push/poll data from topics. Additionally, brokers maintain a list of consumer groups attached to each topic. When there are changes made to a topic, such as the number of partitions changing, the broker will notify all attached consumer groups so they can rebalance their consumers accordingly.
class Broker:
from collections import defaultdict
topics = {}
topics_subscription = defaultdict(set)

@classmethod
def create_topic(cls, topic_name, partitions, strategy: TopicDataPartitionStrategy, storage_type=LLStorage):
"""
Create new topic
:param storage_type: storage class to use
:param topic_name: Topic name
:param partitions: Number of partitions
:param strategy: Data Partition Strategy
:return:
"""
if cls.topics.get(topic_name):
raise Exception("topic already exists")
cls.topics[topic_name] = Topic(topic_name, partitions, strategy, storage_type=storage_type)

@classmethod
def add_data_to_topic(cls, topic_name, data):
cls.topics[topic_name].push_data(data)

@classmethod
def poll_data(cls, topic_name, partition, offset, limit):
return cls.topics[topic_name].poll_data(partition, offset, limit)

@classmethod
def attach_to_topic(cls, topic_name, consumer_group, **kwargs):
# whenever consumer group want to attach to topic, we will store subscription
cls.topics_subscription[topic_name].add(consumer_group)
return cls.get_no_of_topic_partitions(topic_name)

@classmethod
def get_no_of_topic_partitions(cls, topic_name):
return len(cls.topics[topic_name].partitions)

@classmethod
def add_partition(cls, topic_name):
cls.topics[topic_name].add_partition()
cls.notify_subscribers(topic_name)

@classmethod
def add_subscription(cls, topic_name, consumer_group):
cls.topics_subscription[topic_name].add(consumer_group)

@classmethod
def remove_subscription(cls, topic_name, consumer_group):
cls.topics_subscription[topic_name].remove(consumer_group)

@classmethod
def notify_subscribers(cls, topic_name):
from consumer.in_memory_group_coordinator_helper import GroupCoordinatorClient
for consumer_group in cls.topics_subscription[topic_name]:
GroupCoordinatorClient.topic_partitions_changed(consumer_group, topic_name)

Offset client

  • PartitionTopic: This class defines an individual partition of the topic. we will assign one or more instances of this to consumers so that they can consume data from the same.
  • Offset client: It maintains the offset of the individual consumer groups on all topics<>partions it is consuming from.
class PartitionStatus(enum.Enum):
UNASSIGNED = 'unassigned'
ASSIGNED = 'assigned'


class PartitionTopic:

def __init__(self, topic_name, partition):
self.topic_name = topic_name
self.partition = partition

def __str__(self):
return f'{self.topic_name}::{self.partition}'

def __hash__(self):
return hash(self.__str__())

def __eq__(self, other):
if isinstance(other, self.__class__):
return self.__str__() == other.__str__()
return False

class OffsetClient:
offsets = defaultdict(dict)

@classmethod
def get_offset_key(cls, consumer_group, topic_name, partition):
return f'{consumer_group}_{topic_name}_{partition}'

@classmethod
def commit_offset(cls, consumer_group, topic, partition, offset):
cls.offsets[cls.get_offset_key(consumer_group, topic, partition)] = offset

@classmethod
def get_offset(cls, consumer_group, topic, partition):
return cls.offsets.get(cls.get_offset_key(consumer_group, topic, partition), 0)

Partition Assignment Strategy

PartitionAssignment Strategy: This class is responsible for distributing partitions among consumers based on the total partitions the consumer group is consuming from and the number of consumers present in it.

It calculates how many partitions should be assigned to each consumer. We can define custom partition assignment strategies by extending the PartitionAssignmentStrategy class.

from collections import defaultdict


class PartitionAssignmentStrategy:
"""
Provides a consumer_member_id<>number_of_topic_partition_ids mapping.
It basically tells us how many partitions to be assigned to each consumer
"""

def __init__(self, consumer_member_ids):
"""
:param consumer_member_ids: we will sort consumer member ids as we don't want shuffling to change in case of
ties between no of partitions
"""
self.consumer_member_ids = sorted(consumer_member_ids)

def get_partitions_assignment(self, no_of_partitions):
raise Exception("No Default Implementation")


class EqualAssignmentStrategy(PartitionAssignmentStrategy):
"""
this will split the partitions equally among all consumer members
for eg. consumers : 5 and partitions : 5 , it will say assign each consumer one partition
consumers: 5 and partitions : 7 , it will say assign two partitions to two consumers , and all the rest one
consumer: 6 and partitions : 5 , it will say assign 5 consumers one partition and one will be unassigned
"""

def __init__(self, consumer_member_ids):
super(EqualAssignmentStrategy, self).__init__(consumer_member_ids)

def get_partitions(self, no_of_partitions):
print("Len partition, len consumer_members", no_of_partitions, len(self.consumer_member_ids))
if not len(self.consumer_member_ids):
return {}
_equal = no_of_partitions // len(self.consumer_member_ids)
_rem = no_of_partitions % len(self.consumer_member_ids)
sol = defaultdict(int)
for ix, k in enumerate(self.consumer_member_ids):
sol[k] += _equal + (1 if ix < _rem else 0)
return sol

Defining GroupCoordinator

  • Group Coordinator: Each consumer group has one group coordinator who manages the group membership and partition assignments for all consumers in the group. It exposes APIs for consumers to join/leave the group and handles the rebalancing of partitions when the group membership changes. The group coordinator also monitors the health of the consumers and triggers rebalancing if any consumer becomes unavailable or unresponsive.
  • Rebalancing: Rebalancing is assigning overall group topic_partition_ids between available consumers.
    An example use case to understand it better:
    - Let’s say our consumer group is attached to 2 topics(T1, T2) each having 2 partitions(P1, P2). So overall topic_partitions in our group will be 4(T1<>P1,T1<>P2,T2<>P1,T2<>P2). Let’s say the consumer group has 2 consumers (C1, C2). One way to rebalance would be that C1 can be assigned (T1<>P1 and T2<>P1) and C2 can be assigned (T1<>P2 and T2<>P2). Similarly in this way, we can have a total number of 4C2 types of partition assignments.

Rebalancing Edge Cases: There are a few things that we must keep in mind while rebalancing:

  1. When a new consumer member joins a Kafka consumer group, the current partitions are rebalanced in order to assign the new consumer some partitions. For example, let’s say a consumer group has two consumer members (C1 and C2) and is attached to only one topic T1 with three partitions. Currently, one partition is assigned to C1, and two partitions are assigned to C2.
  2. If a new consumer “C3” joins the group, the coordinator will rebalance the partitions according to the assigned policy. If the policy is the equal assignment policy, the coordinator will remove one partition from C2 and assign it to C3.
  3. However, simply removing a partition from a consumer may cause problems if the consumer is still processing data from that partition and has not committed the offset yet. In such cases, if a partition is removed from C2 and assigned to C3, it may be possible that C2 is still in the middle of a polling cycle and has not committed the offset. As a result, C3 may start processing data from a stale offset, causing data to be processed multiple times.
  4. To handle this, the coordinator will first issue a request to C2 to revoke some partitions. Once C2 revokes the partitions, it will notify the coordinator so that the revoked partitions are marked unassigned, and on further rebalance can be assigned to C3 ( we are monitoring for unassigned partitions regularly to rebalance). This ensures that each partition is assigned to only one group at any given time, preventing duplicate processing of data.
class GroupCoordinator:
"""
Manages consumers assigned to the group
constantly monitors the consumers and reassign if anything is killed
Rebalance partitions between consumers attached to group
"""

def __init__(self, name, partition_assignment_strategy='EqualAssignmentStrategy', is_leader=False):
"""
:param name: Consumer group name
:param partition_assignment_strategy: Partition assignment strategy
:param is_leader : Specify if the group coordinator is leader or not, only leader can rebalance partitions to
avoid race conditions.
"""
self.name = name
self.rebalance_lock = Lock() # this lock will help prevent race conditions during partition rebalances
self.partition_assignment_strategy = PartitionAssignmentStrategy.get_partition_strategy(
partition_assignment_strategy)
self.broker_client = Broker
self.is_leader = is_leader
self.offsetClient = OffsetClient
self.partitions = {} # overall topic_partitions assigned to the group
self.heart_beat = {} # heart beat tracker of all consumers
self.consumer_nodes = set() # consumer ids of consumer nodes assigned to the group
self.consumer_nodes_mapping = defaultdict(set) # partition mappings assigned to consumer nodes

GroupCoordinatorClient.register_group_coordinator(name, self)

if not self.is_leader:
return

# start check on unassigned partitions
t1 = threading.Thread(target=self.check_unassigned_partitions, daemon=True)
t1.name = "unassigned_partition"
t1.start()

# start check on consumer group health
t2 = threading.Thread(target=self.check_consumer_health, daemon=True)
t2.name = "consumer_healths"
t2.start()

def notify_consumer_members_changed(self, *args, **kwargs):
"""
call this method if consumer members are changed.
:param args:
:param kwargs:
:return:
"""
self.rebalance_partitions()

def register_consumer_member(self, consumer_id):
"""
register consumer member to the group
:param consumer_id:
:return:
"""
print(f"registering_consumer_member {consumer_id}")
self.consumer_nodes.add(consumer_id)
self.notify_consumer_members_changed()

def mark_partitions_unassigned(self, consumer_id, topic_partition_ids):
"""
Remove partitions from consumer and mark it unassigned
:param topic_partition_ids: partitions to remove
:param consumer_id:: consumer id to remove partitions from
:return:
"""
print(f"removing_partition_from {consumer_id} {topic_partition_ids}")
with self.rebalance_lock:
for topic_partition_id in topic_partition_ids:
self.consumer_nodes_mapping[consumer_id].remove(topic_partition_id)
self.partitions[topic_partition_id] = PartitionStatus.UNASSIGNED.value

def get_assigned_partitions(self, consumer_id):
"""
get assigned partitions for a consumer member
:param consumer_id:
:return:
"""
return self.consumer_nodes_mapping[consumer_id]

def print_curr_assignment(self):
for cnsr_id in self.consumer_nodes:
for p in self.consumer_nodes_mapping[cnsr_id]:
print(f'{p} in {cnsr_id}')

def add_partition_mapping(self, consumer_member_id, topic_partition):
"""
Add partition mapping to consumer and mark it assigned.
Here we can directly add the partition, without notifying the consumer , since consumer is getting updated
partitions in every poll request, so this new partition will be returned in next get_partitions_assigned request.
:param consumer_member_id:
:param topic_partition:
:return:
"""
self.partitions[topic_partition] = PartitionStatus.ASSIGNED.value
self.consumer_nodes_mapping[consumer_member_id].add(topic_partition)

def notify_partitions_changed(self, topic_name):
"""
Listener in case partitions in topic is changed, to assign new partitions to the nodes
"""
for partition_id in range(self.broker_client.get_no_of_topic_partitions(topic_name)):
curr = PartitionTopic(topic_name, partition_id)
if curr not in self.partitions:
self.partitions[curr] = PartitionStatus.UNASSIGNED.value
self.rebalance_partitions()

def get_unassigned_partitions(self):
"""
Get unassigned partitions in the group
:return:
"""

return set([topic_partition_id for topic_partition_id, status in self.partitions.items() if
status == PartitionStatus.UNASSIGNED.value])

def check_unassigned_partitions(self):
"""
Check unassigned partitions and trigger a re-balance if found any.
Note: consumer nodes may not have any partition , if no of consumers > no of partitions
:return:
"""
while True:
with self.rebalance_lock:
unassigned = self.get_unassigned_partitions()
if unassigned:
print(f"Found unassigned partitions, {unassigned}")
self.rebalance_partitions()
time.sleep(5)

def get_consumer_nodes_in_group(self):
return self.consumer_nodes

def partitions_len_in_group(self):
return len(self.partitions)

def notify_remove_partition(self, consumer_id, partition_ids):
"""
Notify consumer member to remove partition.
Note: We cannot just blindly remove a partition from consumer, since it might be that consumer is still processing
data from that partition and has not committed offset yet. So in this case if we remove the partition and assign
to another one, same data can be processed multiple times.
:param consumer_id:
:param partition_ids:
:return:
"""
GroupCoordinatorClient.notify_remove_partition(consumer_id,
partition_ids)

def rebalance_partitions(self):
"""
Re-balances the partitions between all assigned consumers
"""
if not self.is_leader:
print("not a leader")
return
if not self.partitions_len_in_group():
return
print("REBALANCING")
self.rebalance_lock.acquire()
members = self.get_consumer_nodes_in_group()
if not members:
print("no consumer member attached to group")
self.rebalance_lock.release()
return

partitions_assignment = self.partition_assignment_strategy(members).get_partitions(
self.partitions_len_in_group())
print(f'Partitions assignments should be {partitions_assignment}')

for consumer_id, no_of_partitions in partitions_assignment.items():
curr_assigned_partitions_to_node = self.get_assigned_partitions(consumer_id)
_len_curr_assigned_partitions_to_node = len(curr_assigned_partitions_to_node)
print(f"curr_assigned_partition_to_{consumer_id}_are_{curr_assigned_partitions_to_node}")

# If consumer has less than required partitions, assign any unassigned partition to the consumer
while _len_curr_assigned_partitions_to_node < no_of_partitions:
unassigned_partitions = self.get_unassigned_partitions()
if not unassigned_partitions:
print("No unassigned partitions left")
break
print(f"unassigned_partitions_are_{unassigned_partitions}")
curr_unassigned_partition = unassigned_partitions.pop()
self.add_partition_mapping(consumer_id, curr_unassigned_partition)
_len_curr_assigned_partitions_to_node += 1

# If consumer has more than required partitions, remove the extra partitions so they can be distributed
# accordingly
while _len_curr_assigned_partitions_to_node > no_of_partitions:
print(f"removing extra partitions from {consumer_id}")
no_of_partitions_to_remove = _len_curr_assigned_partitions_to_node - no_of_partitions
partitions_to_remove = random.sample(list(self.get_assigned_partitions(consumer_id)),
no_of_partitions_to_remove)
threading.Thread(target=self.notify_remove_partition, args=(consumer_id, partitions_to_remove)).start()
_len_curr_assigned_partitions_to_node -= no_of_partitions_to_remove

self.rebalance_lock.release()
self.print_curr_assignment()
print("Rebalancing Completed")

def notify_consumer_member_removed(self, consumer_member_id):
"""
remove_consumer_member from the group ie. mark it partitions unassigned and delete mappings
consumer_member_id : member id to remove
"""
with self.rebalance_lock:
for topic_partition_id in self.consumer_nodes_mapping[consumer_member_id]:
self.partitions[topic_partition_id] = PartitionStatus.UNASSIGNED.value
del self.consumer_nodes_mapping[consumer_member_id]
self.consumer_nodes.remove(consumer_member_id)

def attach_to_topic(self, topic_name):
"""
Get topics of partitions and mark them unassigned so that rebalance can assign them to relevant consumers
:param topic_name:
:return:
"""
if not self.is_leader:
print("Not a leader")
return
print(f"attaching to topic {topic_name}")
with self.rebalance_lock:
for partition_id in range(self.broker_client.attach_to_topic(topic_name, self.name)):
topic_partition = PartitionTopic(topic_name, partition_id)
if topic_partition in self.partitions:
continue
self.partitions[topic_partition] = PartitionStatus.UNASSIGNED.value

self.rebalance_partitions()

def check_consumer_health(self):
"""
Check consumer health from the continuous heart beat they are sending.
If we do not receive a heart beat for some time, we will assume the consumer to be dead and
remove it from the group.
:return:
"""
while True:
threshold = 10
for consumer_node in list(self.consumer_nodes):
if time.time() - self.heart_beat[consumer_node] > threshold:
self.notify_consumer_member_removed(consumer_node)
time.sleep(3)

def ping_heart_beat(self, consumer_id):
"""
Update heart beat of a consumer
:param consumer_id:
:return:
"""
self.heart_beat[consumer_id] = time.time()

def get_offset(self, topic_name, partition_id):
"""
get offset for a topic<>partition ID
:param topic_name:
:param partition_id:
:return:
"""
return self.offsetClient.get_offset(self.name, topic_name, partition_id)

def commit_offset(self, topic_name, partition_id, value):
"""
commit offset for topic<>partiton ID
:param topic_name:
:param partition_id:
:param value:
:return:
"""
return self.offsetClient.commit_offset(self.name, topic_name, partition_id, value)

GroupCoordinator Client

It exposes a simple interface so that the consumer and group coordinator can interact with each other.

class GroupCoordinatorClient:
"""
This class acts as communication intermediary between consumer and consumerGroupCoordinator
"""

group_coordinator = {}
consumers = {}

@classmethod
def join_group_request(cls, group_name, consumer_id, consumer):
"""

:param group_name:
:param consumer_id:
:param consumer:
:return:
"""
cls.consumers[consumer.id] = consumer
cls.group_coordinator[group_name].register_consumer_member(consumer_id)

@classmethod
def get_assigned_partitions(cls, group_name, consumer_id):
return cls.group_coordinator[group_name].get_assigned_partitions(consumer_id)

@classmethod
def register_group_coordinator(cls, name, group_coordinator):
cls.group_coordinator[name] = group_coordinator

@classmethod
def notify_remove_partition(cls, consumer_id, topic_partition_ids):
return cls.consumers[consumer_id].notify_partition_removed(topic_partition_ids=topic_partition_ids)

@classmethod
def revoke_partition(cls, group_name, partition_ids, consumer_id):
cls.group_coordinator[group_name].mark_partitions_unassigned(consumer_id, partition_ids)

@classmethod
def send_heart_beat(cls, group_name, consumer_id):
cls.group_coordinator[group_name].ping_heart_beat(consumer_id=consumer_id)

@classmethod
def stop_consumer(cls, consumer_id):
cls.consumers[consumer_id].stop_consumer_gracefully()

@classmethod
def topic_partitions_changed(cls, group_name, topic_name):
cls.group_coordinator[group_name].notify_partitions_changed(topic_name)

@classmethod
def get_consumer_group_offset(cls, group_name, topic_name, partition):
return cls.group_coordinator[group_name].get_offset(topic_name, partition)

@classmethod
def commit_consumer_group_offset(cls, group_name, topic_name, partition, value):
return cls.group_coordinator[group_name].commit_offset(topic_name, partition,value)

Consumer member and Data Handler

The consumer member is responsible for joining a group and getting assigned partitions to consume data from. This assignment is a list of PartitionTopics(<topic_id>_<partition_id>) received from the group coordinator. A consumer can only join one group at a time and sends periodic heartbeats to the group coordinator to indicate its health.

For each assigned Topic<>Partition_id, the consumer member performs the following actions:

  • Gets the current offset from the Group Coordinator.
  • Retrieves the next data from the broker for that topic<>partition.
  • Processes the data and commits the new offset.

There are a few interesting fields for the consumer member, such as:

  • max_polling_concurrency: This field defines the degree of parallel processing for all assigned partitions to the consumer. Processing for each topic_partition_id will be done on a parallel basis with a set of worker threads.
  • msg_to_consume: This field is used by the consumer to poll the next set of messages from the partition, i.e. it will poll from current_offset to current_offset+msg_to_consume.

Data Handler: The data handler processes the data consumed by the consumer member. We can define custom data handlers by extending the DataHandler class.

class DataHandler:

def __init__(self):
pass

def handle(self, data):
raise Exception("Not Implemented")


class DefaultDataHandler(DataHandler):

def handle(self, data):
for d in data:
print(f"processing data {d}")


class ConsumerMember:

def __init__(self, consumer_group_name, msg_to_consume=10, max_polling_concurrency=5, handler_class=None):
self.active = True
self.consumer_group = consumer_group_name
self.handler = handler_class or DefaultDataHandler() # handler class to use
self.id = str(uuid.uuid4())
self.commit_lock = Lock() # this lock will be used in polling cycle
self.group_coordinator_client = GroupCoordinatorClient # group coordinator client to communicate with group coordinator
self.topic_client = Broker # topic client to communicate with broker
self.msg_to_consume = msg_to_consume # msg to consume from each partition
self.max_polling_concurrency = max_polling_concurrency # max partitions to poll parallely
self.join_group()

def send_heart_beat(self):
"""
Send heart beat to coordinator
:return:
"""
while True:
if self.active:
self.group_coordinator_client.send_heart_beat(self.consumer_group, self.id)
else:
return False

def join_group(self):
"""
Join Consumer group
:return:
"""
self.active = True

# start heart beat
threading.Thread(target=self.send_heart_beat).start()

time.sleep(1)
self.group_coordinator_client.join_group_request(self.consumer_group, self.id, self)

# start polling
threading.Thread(target=self.poll).start()

def get_assigned_partitions(self):
"""
Get assigned partitions from group coordinator
:return:
"""
return list(self.group_coordinator_client.get_assigned_partitions(self.consumer_group, self.id))

def notify_partition_removed(self, topic_partition_ids, *args, **kwargs):
"""
Since group coordinator cannot simply remove partition from consumer, as it may lead to uncommitted offsets.
This function receives request from group coordinator to remove partition and notifies back once it stops
polling so that group coordinator can remove the partitions from this consumer.
:param topic_partition_ids: partitions to remove
:param args:
:param kwargs:
:return:
"""
print(f"removing topic partition ids {topic_partition_ids}")
retry_count = 0
with self.commit_lock:
# take the commit lock so that polling stops, and will be released once partitions
# are removed and consumer fetches updated partitions from group coordinator.
# This is to ensure no uncommitted offset remains
while retry_count < 3:
# commit lock to ensure no uncommitted offsets are left before removing the partition
try:
# send request to group coordinator to remove partition
self.group_coordinator_client.revoke_partition(self.consumer_group, topic_partition_ids,
consumer_id=self.id)

return True
except Exception as e:
print(f"Unable to remove partition{e}")
retry_count += 1

# if retries exceeded , terminate consumer
print("max retries exceeded, terminating consumer")
self.active = False
return False

def stop_consumer_gracefully(self, *args, **kwargs):
"""
Mark consumer inactive
:param args:
:param kwargs:
:return:
"""
print("Stopping consumer")
with self.commit_lock:
# take commit lock to avoid uncommitted partition
self.active = False
print("stopped_consumer")
return True

def poll(self):
"""
This method polls the data from assigned partitions in parallel, processes and commit the updated offset
:return:
"""

if not self.active:
return False

# If no partitions is assigned, there is noting to do :(
while not self.get_assigned_partitions():
if not self.active:
return False
print("no_partition_assigned_to_poll")

while True:

# acquire lock for polling cycle
self.commit_lock.acquire()

if not self.active:
self.commit_lock.release()
break

with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_polling_concurrency) as executor:
# Process data from each assigned partition parallely
polling_result = [executor.submit(self.poll_data_from_partition, topic_partition_id) for
topic_partition_id in self.get_assigned_partitions()]

# Wait for polling cycle to complete before releasing commit lock
results = [task.result() for task in polling_result]

# release lock after polling cycle
self.commit_lock.release()
time.sleep(1) # some delay to avoid lock starvation

def poll_data_from_partition(self, topic_partition):
"""
:param topic_partition:
Call offset client to get the current offset of topic<>partition until which data is consumed
Call broker client to get data from topic partition from offset to offset+msg to consume
Process the data
Commit updated offset to offset client
Note: If committing new offset fails after data processing or anything in data processing fails itself,
consumer will be polling for the same data(as offset is not changed) and keeps processing same data again.
(Additional measures can be added to avoid this , like msg deduplication key)
:return:
"""
try:
if isinstance(topic_partition, str):
topic_partition = PartitionTopic(topic_name=topic_partition.split("::")[0],
partition=int(topic_partition.split("::")[1]))
_topic_name = topic_partition.topic_name
_partition_id = topic_partition.partition
_curr_offset_of_partition = self.group_coordinator_client.get_consumer_group_offset(self.consumer_group,
_topic_name,
_partition_id)
if _curr_offset_of_partition is None:
raise Exception("unable to get latest offset")
_data = self.topic_client.poll_data(_topic_name, _partition_id, _curr_offset_of_partition,
_curr_offset_of_partition + self.msg_to_consume)
if not _data:
return True
print(_data)
# handle data
self.handler.handle(_data)

# commit offset
result = self.group_coordinator_client.commit_consumer_group_offset(self.consumer_group, _topic_name,
_partition_id,
_curr_offset_of_partition + len(_data))

if not result:
raise Exception("unable to commit offset")
print(f"committed offset for topic_partition_id {_topic_name}_{_partition_id}")
return True
except Exception as e:
print(f"Error while processing data {e}")
return False

Testing

Our implementation is complete, we can now write a simple test to verify the functionality.

def test():
from group_coordinator.group_coordinator import GroupCoordinator
from consumer.group_coordinator_client import GroupCoordinatorClient
from broker.broker import Broker
from consumer.consumer_member import ConsumerMember
from broker.partition_strategy.RR_partition_strategy import RRPartitionStrategy
from broker.topic import Data
topic_name = "new_topic"
group_name = "new_group"

# create topic
Broker.create_topic(topic_name, 6, RRPartitionStrategy(), storage_type=ArrayStorage)

# create group coordinator
consumer_group_coordinator = GroupCoordinator(group_name, is_leader=True)

# create consumer members
c1 = ConsumerMember(consumer_group_name=group_name)
c2 = ConsumerMember(consumer_group_name=group_name)

# attach group coordinator to topic(we can attach to multiple topics as well)
consumer_group_coordinator.attach_to_topic(topic_name)

while True:
x = input("choose action")
if x == "add_data_to_topic":
# add 100 data items to topic
topic_to_add_data = str(input("Topic Name ?"))
for i in range(100):
Broker.add_data_to_topic(topic_to_add_data, Data(msg=str(i), ttl=None))
elif x == "print_offsets":
# print all offsets
from offset_manager.offset_manager import OffsetClient
print(OffsetClient.offsets)
elif x == "add_consumer":
# add consumer to the group
c4 = ConsumerMember(group_name)
elif x == "kill_consumer":
# kill consumer
node_id = str(input("provide node id"))
GroupCoordinatorClient.stop_consumer(node_id)
elif x == "add_partition_to_topic":
# add partition to topic
topic_to_add_partition = str(input("Topic Name ?"))
Broker.add_partition(topic_to_add_partition)

Here, Initially, topic partitions will be balanced between 2 consumer nodes:

  • After connecting to the topic
Initial rebalance after we attach the topic
Initial rebalance with 2 consumers
  • add_data_to_topic: Adds data to the provided topic. For testing, we will insert 100 records. Here you can check the data partition strategy in use.
Insert data in topic
  • add_consumer: Add a new consumer, and rebalancing will assign it required partitions
Rebalance after a new consumer added

Conclusion

So that’s it, a complete implementation of an in-memory streaming service. Thanks for staying with me this long and I hope it has been a good learning experience.

There might be some edge cases that I missed or was too lazy to fix. Feel free to point out and fix the same.

You can find the complete code here.

Feel free to drop any questions, comments, or improvements and if you enjoyed the article please support and follow for more.

Bonus

Well, this is too much work for just an in-memory solution. So let’s integrate remote storage into our design so we can run consumers in different processes or machines.

We will cover this in this article: Design In-memory Streams: Part 2. This is the continuation of the story… | by Tushar Gupta | May, 2023 | Medium

--

--