Kafka: Story of a message broker

Disclaimer: This blog is the first to be written in a series of blogs to come: Beginner’s Go at Building Data Pipelines. It’s not going to be necessarily the first blog in the series.

Zaira Zafar
8 min readMay 8, 2020

NOTE: In this blog, I’ll only talk about Kafka Producer and Consumer API and Kafka in general. Streamer API and Connecter API will not be discussed. ALSO This doesn’t cover all of Kafka and will be constantly updated as I learn more and more.

Introduction

Message brokers are one of the many communication methods in a micro service architecture. This architectural pattern abstracts the information applications need to know about one other in order to exchange information and allows them to simply focus on delivering/receiving the message to/from the broker. This creates an efficient and decoupled communication mechanism by taking away all the concerns of routing, security, reliability from the applications and making their jobs simpler.

Kafka

Is a distributed framework for storing, reading and analysis. It’s notorious for being open source. Originally developed by LinkedIn and used to analysing connections, it was made open source and passed on to Apache Foundation which is responsible for development of open source software.

Why use Zookeeper?

If you tried downloading and installing Kafka you must have noticed blogs point to download Zookeeper too.

Kafka depends on Zookeeper to work, this dependency which might sound unnecessary, is really put in place to ensure Kafka is efficient. Zookeeper “is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.” So Kafka uses it for configuration management, coordination between different nodes in a cluster, leader detection and node detection. It also commits offsets regularly in case if a node fails it’ll recover from the last saved offset.

Apache does plan on removing Zookeeper from Kafka and has put a proposal in the community KIP-500, the details haven’t been configured out yet.

An Overview

Courtesy of Cloudera

A topic is a log where data is written by the producer, whilst creating a topic we define number of partitions it can have. Producer sends messages to topic(s) in Kafka and can also include a key with the message. The key is used to define to which topic partition the message should be written to and its guaranteed that all messages with that key will be written to that partition.

Consumers read messages from a topic, and can commit their position inside the topic partition if they belong to a consumer group, this is called an offset. This helps them to continue from the same point where they left off. If a consumer isn’t part of a group, it’ll read from the beginning always.

Offsets can be committed explicitly or automatically.

Producer

Sending Data

Kafka provides an asynchronous mechanism to send data to the broker in order to reduce I/O operations to the server, it places data in memory until a certain defined threshold is met and only then it sends out the data in large chunks. Keep in mind that though there are fewer I/O operations made to the server but they’re large in quantity.

The threshold is defined by either fixed number of message or by a fixed latency bound, which is configurable.

Consumer

Data Ingestion

In this regard, Kafka has followed traditional brokers using push mechanism to send data from producers to broker and pull mechanism to consuming data by consumers. The reason for using pull for consumer is to enable consumers to consume at a maximum possible rate. Such a goal would have been difficult to achieve in a push based mechanism where data transfer rate is controlled by the broker and consumer is overwhelmed when its rate of consumptions becomes less than rate of production.

In pull based, consumer can catch up later if it falls behind in consuming data. It also uses aggressive batching when its comes to reading, from current position of the consumer it pulls all (or the configured max size) of messages. Where as in a push based system, it decides whether to sent the data or gather more and sent it out later without any knowledge if the consumer would be able to process it.

The drawback of a pull based system is that in case where broker has no data, consumer ends up in a polling loop endlessly waiting for the data to arrive. This is important to understand that the consumer while waiting isn’t idle, it’s constantly checking by pinging the broker if data has arrived. To overcome this in Kafka when consumer requests for data, Kafka applies poll blocking on the request until data arrives.

Managing Consumer Reads

Brokers need to keep a track of how many messages consumer(s) have consumed so they can ensure delivery without duplicated consumption. The tracking is also required for purging messages once they have been consumed to avoid memory mismanagement. One method is that the broker keeps a track of the messages it sent out to a consumer for removal. But the issue here is that broker doesn’t concern itself with safe delivery. So while the broker assumes it sent the message, that message can be lost and never received by the consumer resulting in missing chunks in data.

Another method is where Broker maintain states: sent and consumed, for each message and while sending a message, the message is locked in a sent state so it can’t be consumed again by the same consumer and after receiving acknowledgement from the consumer the message is put in consumed state so it can be removed as per need of the broker. This acknowledgement feature adds complexity to Broker.

But here’s what Kafka does differently: It divides a topic into “ordered partitions”, and each partition can only be consumed by one consumer within a consumer group. Let’s understand this. We can have multiple producers push messages to a topic and similarly we can have multiple consumers read from the topic. If we have only one consumer in a group then all the partitions would be read by that one consumer. But if we have n consumers, then k partitions would be read as k/n per consumer.

Single consumer reading from a topic. Courtesy of O’Reily Media
Multiple consumers reading from same topic. Courtesy of O’Reily Media

What does this mean? The broker only maintains the integer position of a consumer in the partition its reading. This can be converted into a check point so we know where the consumer was last positioned. This clean mechanism makes the state management cheap with a side-benefit. Consumers can rewind offsets and re-read data which it already read before but this is a helpful in many cases such as Consumer crashes.

Group Memberships

Kafka takes a different approach to group management. Usually a rebalance protocol is followed which depends on group coordinator for providing temporary/short-lived entity ids to group members and a rebalance happens everything a consumer rejoins or restart. Why is this a problem? “For large state applications, shuffled tasks need a long time to recover their local states before processing and cause applications to be partially or entirely unavailable.” But Kafka uses a different group management protocol which provides permanent entity ids to group members. Since group membership doesn’t change based on those ids, no rebalance is required. A rebalance happens when:

  • A consumer cleanly shuts down
  • New consumer joins
  • A consumer has died. A consumer is considered dead when it doesn’t send a heartbeat to the group coordinator.

Kafka Security

Kafka provides a set of security features which can be found in its documentation. Among one is SSL authentication, by default it is disabled but can be enabled as needed. An important thing to note is “SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported.”

Kafka describes steps to enable SSL in great details under security section in its documentation.

Kafka Policies

Topic Retention

Deleting a topic is one thing, what if I only want to delete the content inside a topic? That’s where topic retention policy comes in. It means how long the contents of a topic will be retained before being deleted automatically by Kafka. There are two types of retention policies.

Time Based: The default value for this is 168 hours which is 7 days but we can change it as per our need

> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config retention.ms=1000

This ‘retention.ms’ property is global which means all topics share it, changing it for a certain topic would change it for all. So be sure to specify your topic name in the command as its shown above.

Playing with Kafka

First you start Zookeeper, go to the directory where you unzipped Kafka and run:

> bin/zookeeper-server-start.sh config/zookeeper.properties

By default, Zookeeper runs on localhost:2181

Then you start Kafka:

> bin/kafka-server-start.sh config/server.properties

By default Kafka runs on localhost:9092

Create a topic

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic

List topics

> bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Deleting topic

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic

Sending a message as a producer to Kafka

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

You would have to ctrl+c to stop sending messages

Receiving messages as consumer

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

Receiving messages as consumer with a consumer group

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --consumer-property group.id=my-group

Offset controls the behaviour of consumer reads in a partition. By default it’s ‘latest’ which means consumer will read from new message. You can reset it to ‘earliest’ which means to read from the start of partition. To reset, stop the consumer and run:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --reset-offsets --to-earliest --execute --topic my-topic

Listing groups

> bin/kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092

Checking consumer position

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group

Deleting group(s)

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

Conclusion

I understand Kafka better now and I hope you do too. It’s no longer a framework we only implement quickly because we don’t have the time to understand how it does what it does. Kafka isn’t born of need, but from observation of where many message brokers show flaws, it really goes out of its way to avoid the pitfalls of other brokers. But that doesn’t mean Kafka is perfect.

P.S: If you think I wrote something that is incorrect, do let me know in the comments. :)

--

--