Kafka: Why What and How?

Jeevan D C
DKatalis

--

This document tries to explain Kafka from an API standpoint for Engineers to effectively use in their applications and answers some nitty-gritty questions which aren’t so obvious.

Contents:

1. Why Kafka?
2. What is Kafka?
3. How does it work?
4. Scenarios to think through
5. Why is it hard to achieve optimal performance with Kafka?
6. How do deal with some common problems?

Why Kafka?

We wished to achieve Distributed Event-Driven(ED) architecture to get the benefits of asynchronous request processing, decoupling of microservices, overall resiliency, eventual consistency, pub-sub paradigms.

To achieve the above, Message-Queue(MQ) seemed to be one of the tools to help with.
And since Kafka is one of the MQs and it offers:

  • High Throughput: High capacity processing of data per node (benchmarks here)
  • Distributed setup: Can span the Broker nodes across multiple regions, AZs nodes
  • Persistence: Stores messages for a specified time (ex: 3 to 14 days usually)
  • Scalability: Can scale horizontally by addition of broker nodes
  • Fault-Tolerance: Offers various replication options to be fault-tolerant at times of node failures
  • Multiple Consumers API: Unlike traditional queues, multiple consumers can subscribe to the same message queue.

These reasons were highlighted differentiators to choose Kafka over other popular Message Queues like RabbitMq, Pulsar, etc

What is Kafka?

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications

Architecture:

Why is Zookeeper necessary for Apache Kafka?

Zookeeper being strongly consistent distributed storage helps Kafka with consensus situations like Leader Election of brokers/ partitions/ consumers, topics config, ACLs, cluster memberships.
(Zookeeper is being removed in lastest Kafka by developing their own consensus mechanism)

Terminologies:

  • Message: A record or unit of data within Kafka. Each message has a value, and optionally key, headers.
  • Producer: Producers publish messages to Kafka topics. Producers decide which topic partition to publish to, either randomly (round-robin) or using a partitioning algorithm based on a message’s key.
  • Broker: Kafka runs in a distributed system or cluster. Each node in the cluster is called a broker.
  • Topic: A topic is a category to which data records — or messages — are published. Consumers subscribe to topics in order to read the data written to them. ex: transfer.completed
  • Topic partition: Topics are divided into partitions, and each message is given an offset. Each partition is typically replicated at least once or twice. Each partition has a leader and one or more replicas (copies of the data) that exist on followers, providing protection against a broker failure. All brokers in the cluster are both leaders and followers, but a broker has at most one replica of a topic partition. The leader is used for all reads and writes.
  • Offset: Each message within a partition is assigned an offset, a monotonically increasing integer that serves as a unique identifier for the message within the partition.
  • Consumer: Consumers read messages from Kafka topics by subscribing to topic partitions. The consuming application then processes the message to accomplish whatever work is desired.
  • Consumer group: Consumers can be organized into logic consumer groups. Topic partitions are assigned to balance the assignments among all consumers in the group. Within a consumer group, all consumers work in a load-balanced mode; in other words, each message will be seen by one consumer in the group. If a consumer goes away, the partition is assigned to another consumer in the group. This is referred to as a rebalance. If there are more consumers in a group than partitions, some consumers will be idle. If there are fewer consumers in a group than partitions, some consumers will consume messages from more than one partition.
  • Lag: A consumer is lagging when it’s unable to read from a partition as fast as messages are produced to it. Lag is expressed as the number of offsets that are behind the head of the partition. The time required to recover from lag (to “catch up”) depends on how quickly the consumer is able to consume messages per second
  • Replica: The mirror/sync of your data to handle the faults. So that the data doesn’t get lost.
  • In-Sync Replica: The in-sync replicas are replicas that are alive and are in sync with the leader.
  • min-in-Sync Replica: This is the minimum number of replicas that must acknowledge that data was received successfully for a write to be successful.
  • Committing offset: Committing an offset for a partition is the action of saying that the offset has been processed so that the Kafka cluster won’t send the committed records for the same partition. Committed offset is important in case of a consumer recovery or rebalancing

How does it work?

Producing messages:

When a producer publishes a record to a topic, it is published to its leader. The leader appends the record to its commit log and increments its record offset.

A producer must know which partition to write to, this is not up to the broker. Some Kafka-clients abstract this. We are using Round-Robin. It’s possible for the producer to attach a key to the record dictating the partition the record should go to. All records with the same key will arrive at the same partition. Before a producer can send any records, it has to request metadata about the cluster from the broker. The metadata contains information on which broker is the leader for each partition and a producer always writes to the partition leader. The producer then uses the key to know which partition to write to, the default implementation is to use the hash of the key to calculate partition, you can also skip this step and specify partition yourself.

A common mistake when publishing records is setting the same key for all records, which results in all records ending up in the same partition and we get an unbalanced topic.

Also, producers can send the messages to the broker in batch decided by time or size. Until then it buffers the data in memory.

Consuming messages: Pull based not Push

Kafka follows the principle of a dumb broker and smart consumer.

That means consumer needs to heavy lifting of pulling the messages and co-ordination(topic member assignment) with consumers within a consumer group.

Consumers themselves poll Kafka for new messages and say what records they want to read. This allows them to increment/decrement the offset they’re at as they wish, thus being able to replay and reprocess events.

Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.

Let’s take topic transfer.completed with four partitions. Now suppose we created a new consumer group ms-transfermicro-service (ms), and ms-transfer ms has one pod (instance in k8s) running, pod-1 which is the only consumer in group ms-transfersubscribes to topic transfer.completed. Consumer pod-1 will get all messages from all four transfer.completed partitions.

One Consumer group with four partitions and one consumer in group

If we add another consumer, pod-2, to group ms-transfer, each consumer will only get messages from two partitions. Perhaps messages from partition 0 and 2 go to pod-1 and messages from partitions 1 and 3 go to the consumer pod-2.

Four partitions split to two consumers in a group

If ms-transfer has four consumers, then each will read messages from a single partition.

Four consumers in a group with one partition each

If we add more consumers to a single group with a single topic than we have partitions, some of the consumers will be idle and get no messages at all.

More consumers in a group than partitions means idle consumers

The main way we scale data consumption from a Kafka topic is by adding more consumers to a consumer group.

In addition to adding consumers in order to scale a single application, it is very common to have multiple applications that need to read data from the same topic. In fact, one of the main design goals in Kafka was to make the data produced to Kafka topics available for many use cases throughout the organization.

In the previous example, if we add a new consumer group ms-notification with a single consumer, this consumer will get all the messages in the topic transfer.completed independent of what ms-transfer is doing. ms-notification can have more than a single consumer, in which case they will each get a subset of partitions, just like we showed for ms-transfer, but ms-notification as a whole will still get all the messages regardless of other consumer groups.

Adding a new consumer group, both groups receive all messages

To summarize, you create a new consumer group for each application that needs all the messages from one or more topics. You add consumers to an existing consumer group to scale the reading and processing of messages from the topics, so each additional consumer in a group will only get a subset of the messages.

Scenarios to think through:

1. Do (producer or consumer ) have to connect to zookeeper?

Nope. We just have to connect to one of the brokers and as part of the connection fulfillment producer and the consumer receives the metadata that’s all required.

2. Do (producer or consumer ) have to connect to all brokers to send messages to different partitions laying in different brokers?

Nope. We just have to establish a connection with one broker and forwarding of the message to the right partition leader happens internally.

3. What happens when a consumer within a consumer-group joins or leaves?

Consumer Rebalancing!
When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry. This setting controls the maximum number of attempts before giving up.

4. Does Rebalancing happen only on consumers leaving and joining?

Need not be.
Rebalancing is triggered when:

  • a consumer JOINS the group
  • a consumer LEAVES the group
  • a consumer is considered DEAD by the group coordinator. This may happen after a crash or when the consumer is busy with a long-running process, which means that no heartbeats have been sent in the meanwhile by the consumer to the group coordinator within the configured session timeout
  • new partitions are added

5. What is the difference between a Consumer Group Coordinator and a Consumer Group Leader?

  • The consumer group coordinator is one of the brokers while the group leader is one of the consumers in a consumer group.
  • The Group coordinator is nothing but one of the brokers who receive heartbeats (or polling for messages) from all consumers of a consumer group. Every consumer group has a group coordinator. If a consumer stops sending heartbeats, the coordinator will trigger a rebalance.
  • Group Leader generates assignment and sends it to Kafka. It also asks for its own assignment.
  • Kafka receives the assignment from the Group Leader and responds to the Followers with their particular set of partitions.

6. How to read /pull messages from Broker by Consumers?

  • Message by message.
    - Pull one message -> process -> commit offset
    - Increased RRT (round trip time) to brokers
    - Lower throughput
    - Closer towards consume at most once delivery semantics
  • In a Batch (defined by bytes size) (our current config)
    - Pull n messages -> process all -> commit the last offset in batch
    - Higher throughput
    - Closer towards consume at least once delivery semantics

7. How to commit Offset?

  1. Auto commit.
    - By periodically committing offsets
    - No guarantee of durability
  2. Manual commit. (our current config)
    - Manual Commit after processing the message
    - Guarantees at least once delivery semantics

8. How does Broker know if a consumer alive or dead?

Consumers have to send periodic (by time or by consumption of messages) heartbeats to Group co-ordinator broker.

heartbeatInterval < sessionTimeout

If Heartbeat fails we will see:

[kafka] The coordinator is not aware of this member

9. Can we pause

Good collection of QnAs here.

Why is it hard to achieve optimal performance with Kafka? (for selective use-cases)

“Too many knobs to turn”

High number of config options

What knobs to tune for optimal performance? (depends on scenarios)

Glossary:
- Throughput
: refers to the number of records processed over time(produce, consume)
- Latency: refers to the time taken to process a record (produce, consume)

Before tuning lets understand how multiple components like size of record(message), replication factor, partitions count per topic, batch size of producing & consuming messages affect Throughput and Latency (in case of consumers also called Lag)

  • ↑ Size of message 1/Throughput ↓
  • ↑ Size of message Latency ↑
  • ↑ Batch size 1/Throughput ↓
  • ↑ Batch size Latency ↑
  • ↑ min-in-sync replicas Latency ↑
  • ↑ min-in-sync replicas 1/ (Throughput ↓)
  • ↑ consumers Throughput ↑
  • ↑ consumers 1/ (Latency ↓)
  • ↑ consumers more time for rebalancing ↑
  • ↑ partitions Throughput ↑
  • ↑ partitions 1/ (Latency ↓)
  • ↑ partitions unavailability ↑
    (on a broker failure: operational time(/ cost) of moving partitions data, leader election which will make producer or consumer wait for that unavailable duration)
  • ↑ partitions memory ↑ (for both producer and consumer side for buffering data per partition)
  • ↑ replication factor 1/ (Throughput ↓)
  • ↑ replication factor Latency ↑ (for producer)

General Rule of thumb (not limited to)

  • number of partitions for a topic > number of consumers
  • min-in-sync replicas > 2
  • replication factor > n/2 of brokers (5 brokers => replication factor > 3)
  • producer acknowledgment = 1( producer.ack=1) for least reliability of message delivery
    (This means broker sends ack to the producer as soon leader broker writes the message to its persistent storage)
  • producer acknowledgment = all( producer.ack=all ) for most reliability of message delivery
    (This means broker sends ack to producer only after all brokers including leader and replicas writes the message to its persistent storage)
  • heatBeatInterval < sessionTimeout

How do deal with some common problems?

1. Too many Rebalancing

  • Avoid drastic scale up or down of consumers
  • Make sure consumer doesn’t go DEAD by sending frequent heartbeats

2. Consumer getting kicked out

  • Avoid long processing of messages
  • If manually committing and manually sending heartBeats: Avoid large batch size

3. Increased Lag/ (less Consumer throughput)

  • Avoid long processing of messages
  • Avoid single partitions (same keyed messages)
  • Consider priority topics for relevant use cases
  • Look out for frequent rebalancing

Like to tinker and learn through trials and errors? We might use your quirky mind in our team! Join us in making the next life-centric digital solutions!

--

--