Distributed Message Queues

Vedansh Gurunathan
15 min readNov 27, 2022

--

In this article, I am posting all my learnings about the design of distributed message queues. Please feel free to give suggestions in the comment section.

We all know distributed message queues play an important role in building large systems. It helps us decouple tightly coupled components and improves scalability, performance, and availability.

We are going to deep dive into a generic distributed message queue and understand how it works, and I am going to post all my understandings over here. Take it with a grain of salt as I am also learning about it.

Assumptions and Requirements

Let's discuss the assumptions and the design scope we are considering for now.

  1. The format of the messages is text only, with no multimedia as of now. There will be a size limit to text messages sent in the queue.
  2. We will have to maintain the order of the messages sent in the queue but not going to guarantee the order of processing in the whole system which involves more components like producer and consumer.
  3. We will have to support multiple target throughputs and latency for different use cases, e.g. for log aggregation, we will need high throughput and for payment-based scenarios we will need low latency. This will be configurable based on different use cases.
  4. For simplicity, we are going to assume there won't be any updates and deletions of messages.
  5. All other general requirements like persisting, durability, etc.
  6. Should support different delivery semantics like At-most once and At-least Once.

Simple First Draft Design

The below diagram shows a very simple yet overall design of the distributed queue.

Simple Design with all key components

A brief and quick overview of how it works:

  1. The producer sends a message to the message queue.
  2. The consumer subscribes to the message queue. Once subscribed the consumer will receive messages from the queue.
  3. The message queue between the producer and consumer decouples the producer and consumer Service. Now each of the services can be independently operated and scaled.

Concepts and Terminologies

There are some concepts that one must be aware of before going further.

Topics: Topics are the categories to organize the messages. Each topic has a name and is unique across the entire message queue service. The producer sends a message to a specific topic and consumers subscribe to a particular topic to receive messages.

If there are multiple consumers, once a message is read by a consumer it becomes unavailable in the queue, if we want multiple consumers to read the same message we introduce a concept called Consumer Groups.

As discussed in the assumptions section we want the messages to be persisted and the system to be durable. What happens if the data volume for a specific topic is too large and a single server will not be able to handle so much volume?

The approach to solve this problem is partitioning(sharding) every topic in the message queue. We will have to divide the topics into partitions and send messages evenly across the partitions. These partitions are evenly distributed across the servers. These servers that hold partitions are called brokers. The distribution of partitions across brokers is the key element for scalability. The more the number of partitions the more capacity we can have for each topic(but this comes with a cost which we will read about it in the coming sections). Each partition within the queue follows the FIFO mechanism which means we can maintain the order of the messages within the partition of the queue.

When a message is sent by a producer to the consumer actually underneath it is sent to a particular partition. How to determine which particular partition the message will be sent to? The default configuration is to send the messages in a round-robin fashion to each of these partitions. If not we can configure a custom hashing algorithm based on a message key. Some business use cases will require defining a custom key for e.g. in an e-commerce website where we would like to send order updates in sequential order, we can use the message key to be the order-id. All the updates having the same order-id will be sent to the same partition.

When a consumer subscribes to a topic it pulls data from one or more partitions in the topic, if there are a bunch of consumers each one of them pulls from a subset of partitions from the topic. These bunch of consumers form a consumer group.

Consumer groups

Let's answer this question by taking an example. Let's assume there are two consumer groups Consumer Group-A and Consumer Group-B and both of these consumer groups have two consumers each. Refer to the below diagram.

Consumer Groups
  1. Consumer Group A subscribes to Topic A while Consumer Group B subscribes to both topic A and B.
  2. If you notice Topic A is subscribed to by both Consumer Groups A and B which means the same message is read by multiple consumers.
  3. But, If you notice in the diagram within Group A Partition 1 is read by Consumer 1, and Partition 2 is read by Consumer 2. Why do we have to divide the partitons between the Consumers of the same group? Why not let both the Consumers in Group A read from both partitions? Then we will not be able to guarantee the reading order of the messages. One of the Consumers might consume messages faster than the other one.
  4. Hence, we will have to have a constraint where a single partition can only be consumed by one consumer in the same group.
  5. With this constraint, it is important to note that we will always have to create enough partitions beforehand to handle scale and later on when the demand for the service is high we just have to add more consumers.

High-Level Architecture

With all the concepts seen earlier, we can get into high-level architecture.

High-Level Architecture

Clients:

  1. The producer pushes messages to topics and the consumer subscribes to certain topics and reads the messages.

Core Service and Storage:

  1. Brokers: holds multiple partitions. A Partition holds a subset of messages for a topic
  2. Data Storage: Messages are persisted in data storage in partitions
  3. State Storage: Consumer States(how many messages a consumer has read) are managed by state storage.
  4. Metadata Storage: Configuration and Properties of Topics are stored in metadata storage.

Coordinator Service:

  1. Service Discovery: Which Brokers are alive?
  2. Leader Election of brokers as active controllers which is responsible to decide which partition will be mapped to which consumer within the consumer group.
  3. Apache Zookeeper is the most commonly used service to elect an active controller.

Design Deep Dive

We will be discussing all the above components of HLD in detail and there are three most important design choices that we made.

Design Choice 1: Its better to choose an on-disk data structure which is great for sequential access(since queues has to be accessed sequentially) and, which makes segmenting data into blocks simple so that we can use disk caching strategies more effectively.

Let’s go into more detail, about data storage. We would need to persist the messages and the access of those messages will be sequential. Also, the access pattern of messages is both write-heavy and read-heavy, with no update or delete options.

One option is to think of databases whenever we think of storage and for each topic, we will maintain a table. But designing a database that is both write-heavy and ready-heavy alongside very little latency will be difficult.

So the other option is to use Write Ahead Logs(WAL). WAL is a file system where new entries are just appended at the end of the file.

Whenever a new message comes we can simply append them at the end of the file in a new line(each message in a line) and we can easily identify a message by its line number(which is actually the offset of the message). However, a file cannot grow infinitely so we have to divide the file into segments, this segment size of the file can be a configurable value.

Once a segment reaches its size limit the segment file becomes inactive and a new segment file is created. WAL files are easy to persist as well as they give really good performance on disk memory. As seen in the link, it also busts the misconception that disk access is slow, whenever we want sequential disk access we can get comfortably a few MB/sec only random disk access is slow.

Design Choice 2: We will need a message data structure, which makes it easy to transfer messages from producer to queue and finally to consumer with no modifications. This minimizes the need to copy in a high volume and high traffic system.

Message Data Structure
  1. The key of the message is used to determine the partition in the topic of the queue.
  2. Value is actually the value of the message
  3. CRC is Cyclic Redundancy check which helps us to verify if the message has been altered or not.

This Data structure is key to the message queues performance if any of the producers, queue service, or consumers do not adhere to the above format it will involve altering the message format and copy-paste which will seriously hamper our system's performance.

Design Choice 3: We will have to batch wherever and whenever possible. Small I/O hampers the systems throughput, if we want high throughput we will need to keep producers sending messages in batches, queues persisting the message in batches and consumers, consuming the messages in batches too(whenever possible).

When we group messages to a single network request we make fewer network round trips.

The broker also writes the messages to WAL in large chunks, which leads to large chunks of data in sequential write and large segments of data being written in the disk cache all at once.

This is a good time to introduce that there is always a trade-off between throughput and latency. If we are building a system for low latency then we can configure the batch size to be small. Disk performance will suffer in this use case. But if we want to have a system that has large throughput, in those cases we can have a large batch size and also have more partitions in the queue. Refer to this for more details on optimization for throughput.

Let's now discuss more about the flows of producers and consumers.

Producer Flow

Whenever a producer wants to send a message to a topic(or a partition to be precise) which broker should it connect to? We will need a routing layer to determine the partition servers for a message.

  1. The producer will send the message to the routing layer, the routing layer according to the replica plan of the partitions(we will read about replicas a bit further) will send to the leader replica of the partition.
  2. Once the leader replica receives the message, follower replicas will pull the data.
  3. After “enough” follower replica and leader replica have persisted the message in WAL, the broker sends an ACK to the producer, if an ACK is not sent and a request gets timed out the producer will retry.

These above-mentioned steps have a problem, we have a separate layer called as routing layer and continuously communicating with the routing layer will increase the network overhead. Instead, we will have a routing layer wrapped inside the producer itself as shown below. This also makes batching of requests to the producer in memory which will increase the throughput.

Producer wrapped with Routing

Here Broker 1 has Partition1 which is the leader partition and Broker 2’s Partiton1 is the follower partition which is the replica.

Consumer Flow

In Consumer Flow, we will first discuss how a consumer subscribes to a partition of a particular topic.

  1. A new consumer wants to join group 1 and let's say wants to subscribe to topic A. It will first hash the group name and find the corresponding broker for the consumer group. Every consumer group will have a broker coordinator. Once the Coordinator Broker is identified a subscription request is made by the consumer.
  2. The coordinator after receiving the request will assign a particular partition to the consumer.
  3. The consumer now starts receiving messages from the last consumed offset which is maintained by the consumed state in state storage in the broker. If a consumer wants to receive messages from the beginning a special request can be made and a separate state for the consumer will be maintained.
  4. Consumer once processed the message will commit the offset to the broker

What happens if a consumer crashes, leaves, or joins?

Consumer Rebalancing

As we have seen earlier there could be many partitions that could be mapped to a single consumer but never a single partition mapped to many consumers. Whenever a consumer rebalancing occurs a consumer is mapped to one or more partitions.

Every Consumer Group will have a corresponding broker, which will elect a coordinator. The coordinator will receive the heartbeat of consumers and manage their offset.

  1. Each consumer will belong to a group, it finds a dedicated coordinator by hashing the group name.
  2. The coordinator will maintain the list of consumers in a consumer group, when the list changes it will elect a new leader within the consumers to come up with a partition dispatch plan as the partition mapping with the consumers will change.
  3. Now the coordinator will broadcast this consumer-partition mapping to the other consumers.
Consumer Rebalance: New consumer joins

Very similar flows can be made when a consumer leaves(when a leave request is received) or a consumer crashes(if a heartbeat is missed).

Now that we have discussed both the consumer and producer flows, let’s discuss the further components of our Design.

State Storage

  1. The state storage stores the mapping between consumers and partitions
  2. The last consumed offset by each consumer of a partition. This is required if a consumer wants a custom read from a particular offset.
  3. The last consumed offset by each group of a partition. This is required if a consumer crashes and a new consumer joins it can use the last committed offset by the consumer group to reconsume the messages.

There can be many solutions to maintain the state storage as KV store, but considering high read and write operations with low latency, Zookeeper is a great choice that is used by a lot of Distributed Queue systems.

Metadata Storage

This has all the replica distribution plans, partition plans for each topic, and retention period of messages. This does not change frequently and the data volume is small. Also, it has to be highly consistent. In such cases again Zookeeper is a great choice.

Now let's discuss, How Replication works in Distributed Message Queue Service?

Replication

For a fault-tolerant system and to ensure the system is highly available replication of the data is very important. What if one of the disks is damaged? We will need to replication so that the data is persisted.

For each partition, we will have two replicas(configurable value referred as config.replication.count) in across different brokers. There will be a leader replica and follower replicas. A producer will always send messages to the leader replica as discussed in the producer flow, but how does the leader replica ensure that the follower replica is in sync? See the diagram for a clear understanding.

Replica Distribution Plan

Whenever a message request is received by the leader replica, it will wait for its followers to pull the message. Once the message is pulled by the follower it commits to the leader a successful pull and once the leader receives “enough” followers in sync(also called as ISRs) it will acknowledge the producer that it has successfully received the message.

If this replication does not happen in time then the producer keeps on retrying.

These follower and leader replicas will be in different brokers so that if a broker goes down we can still retain all the messages. Who decides the leader and the followers and Who decides the replica distribution plan? This is done by the coordination service of the brokers mentioned in the high-level design.

For further deep-dive reading on replication, a good read is Chapter 5 Replication from the book Design Data-Intensive Applications” is suggested.

In Sync Replicas(ISRs)

Here we will discuss how the configuration of the number of follower replicas can help different business use cases.

The ISR count is a good trade-off between performance and durability, if a producer doesn't want to lose messages we can wait for all the replicas to be in sync, but this hits the performance as we are waiting for all the replicas, and throughput gets reduced. If we want high performance and it is okay to lose messages(like in case of log monitoring) we can even keep the ISR count as 0 and not wait for any replica to be in sync with the leader.

Scalability

Let's see the components which need to be scaled:

  1. Producers: Can be achieved easily by adding and removing producers as per load.
  2. Consumers: Similar to Producers these are independent components from the queue, whenever a new consumer is added/removed Consumer rebalancing algorithm takes place and we have already discussed how it works
  3. Brokers
  4. Partitions

Similar to Consumers the scalability of brokers has the same steps taken as the recovery of brokers after a failure.

What happens when a broker fails?

Let's say we have a Partition Distribution Plan P1 and have 3 Brokers B1, B2, and B3. Now if B3 goes down we will have a broker controller which will detect that B3 has crashed and will create a new Partition Distribution Plan P2.

Similarly, if a new broker is created for scaling up we just need to ask the broker coordinator to create a partition distribution plan accordingly.

The only care that has to be taken whenever generating a Partition Distribution Plan is that all the replicas of the same partition do not end up on the same broker node. Then our system is no longer fault tolerant.

If we keep partitions to different data centers then network latency might be too high, so in that case better to use techniques like data mirroring. As of now, we are not going into data mirroring techniques, might write an article on it later.

Scaling up Partitions

Whenever we want to increase the number of partitions, two people have to be notified the producers and consumers. After the producer sends a message the producer will be notified that the partition count has changed, accordingly whenever a consumer sends a heartbeat the consumer rebalancing algorithm is initiated.

Now let's see the data that is persisted by the partitons. When a new partition is created, the previous data is not moved at all but the messages received post-creation is persisted in all the partitions.

But what happens if we reduce the number of partitions?

  1. Any new messages received further will be directed to other partitions.
  2. Decommissioned Partitions cannot be deleted simply because consumers might still be reading from these partitions so they still have to be live. Only after a configured retention period time expires the persisted data can be freed up.
  3. Once the retention period expires consumer rebalancing will be needed.

Data Delivery Semantics

  1. At most once: At most once means the message will be delivered not more than once, and also the message can be not delivered at all. These kinds of systems ensure high latency as durability is traded off. As discussed in the Partition Replication Section by setting the configurable value config.replication.count to 0 followers this can be achieved in our Distributed Queue System. If the leader is not waiting for any followers to persist the message we can achieve low latency but since the system is not durable we will be able to guarantee only At most once data delivery semantics.
    On the consumer side, the consumer commits the message before processing it. Let’s say the consumer fails after committing the message the message cannot be re-read.
  2. At Least once: No message should be lost but messages can be delivered more than once. This data delivery semantic can be achieved by setting config.replication.count=1 to config.replication.count=ALL.
    On the consumer side, the consumer processes the message and then commits it. If the consumer crashes after processing but before committing it can be consumed again.
  3. Exactly Once: This is difficult to achieve, read more about it here why?

Thanks for reading until here, please upvote if you liked my design discussion. If you have suggestions you can reach out to me on Linkedin, or Gmail, or just comment below.

References

  1. System Design Interview by Alex Xu and Sahn Lam Vol 2
  2. Optimize and Tune Confluent Clients

--

--