Introduction to Kafka

Ali Tahir
13 min readJun 5, 2023

--

Before understanding what is Kafka and how it helps us to solve certain problems, lets dive a bit into some of the modern application problems and why using Kafka makes sense.

Problem

To meet the demand of an integrated and seamless world, modern applications have to communicate with many different applications. Due to the need for communication between different systems multiple integrations have to be developed, resulting in a spaghetti architecture.

spaghetti integrations between different applications

As depicted from the image, it is very complex and difficult to maintain these apps. The complexity increases even further, when you have to consider distributed transactions and the challenges of microservices architecture.

This is where Kafka comes in. It helps us to decouple our applications/services so they are more manageable and the architecture is cleaner and understandable.

clean and manageable integrations using Kafka

Real World Use Cases

Kafka finds application across various industries, playing a crucial role in addressing technical challenges.

For instance, in the logistics sector, Motive (formerly KeepTruckin) utilizes Kafka for effective fleet management. They can track their fleet and display relevant metrics on user-friendly dashboards.

Similarly, in the telecom industry, Etisalat leverages Kafka to facilitate package subscription orders. Furthermore, numerous ecommerce applications employ Kafka for comparable objectives. To explore further on how diverse industries harness Kafka to resolve their specific issues, you can refer to real-world examples and use cases.

Introduction

Kafka is simply an event stream processing platform.

events stream

As depicted by the image above, Kafka processes a current of events one by one. These events are published by producers and consumed by subscribers, similar to a queue.

To Kafka these events are nothing but a set of bytes. Kafka only accepts bytes from producers and provides bytes as an output to consumers.

Kafka stores these events durably and provides near real time stream processing. Because of its persistent storage, real time processing speed and ability to scale, Kafka has become a standard to design event driven systems.

Theoretical Concepts

To learn Kafka, you will have to understand some key terminologies:

Topics

A topic is a feed name to which messages are published in Kafka broker (instance). You can create as many topics as needed. A topic can be identified by its name. Topics are immutable, any data once written cannot be changed. As they are immutable, you cannot rename them.

Kafka Broker with multiple Topics

Note: In case you absolutely need to rename the topic, you will have to create a new topic, migrate the data and the delete the old topic.

Partition and Offsets

Each topic is split into partitions. A topic can have as many partitions as you need. All messages are stored in these partitions and messages within these partitions are ordered. Every message in a partition has an incremental id. This id is called offset.

As depicted in the image, each partition has its own offsets. That means offset 4 in partition 0 has different data when compared to offset 4 in partition 1. And order of messages is only guaranteed within the partitions.

To assign data to the same partition, a key has to be provided. Providing the same key, will result in the messages always going to the same partition. If key is not provided, data is assigned randomly (round robin).

Note: In order to assign data to a particular partition, you need to write a custom Partitioner. When writing a partitioner, you can read the key and value and decide which partition the message should go to.

Data is only kept for a limited time (configurable, default is 1 week). If previous messages are deleted, offsets are not reused.

Note: If key is provided, the producer partitions data by using:

hash(key) % number of partitions

If number of partitions are changed in production the data with same key can end up in a different partition. By default Kafka will not attempt to automatically redistribute data according to new calculations.

If you wish to change the number of partitions (say from 3 to 4) for a topic (let’s call it topic.v1) , you can create a new topic (e.g. topic.v2) with the new number of partitions and migrate messages from topic.v1 to topic.v2

Kafka does not currently support reducing the number of partitions for a topic. If it’s absolutely necessary for your use case, you can follow the above migration approach.

Producers

Producers write data to Kafka topics. Producers initially connect with a bootstrap server, which provides the metadata necessary to establish connection with the cluster. Once the producer is connected to cluster, it can send messages to specific topics and its partitions.

In case a Kafka broker fails, producers can recover automatically.

In order to get message ordering, producers must provide a Key. If a key is provided all the messages for that key will be written in the same partition. If a key is not provided data can be written in any partition (round robin).

A Kafka producer sends the data in the following format:

Message Serialization — As indicated by the format both the key and value should be in binary. For this purpose message serialization can be done using serializers like IntegerSerializer, StringSerializer, or Avro etc.

Compression — By default, messages are not compressed. In case you need message compression, you need to define it explicitly. Message compression is done at the producer side, before sending it over the network. Message compression can be used to reduce message size which can reduce latency, storage requirements, bandwidth usage and cost along with various other benefits.

Note: By default, max message size allowed by Kafka is 1MB. As large messages are considered inefficient. This configuration can be changed but its not recommended. You can read more about it How to send Large Messages to Apache Kafka

Also the default batch size is 16KB, any message larger than that will not be batched and it will be sent individually. Batch size configuration can be changed as well, but keep in mind smaller batches can be sent more frequently. More at: Kafka Producer Batching

Headers — Headers can include any contextual information and metadata that can help consumers like content type, correlation id, authorizations or any custom headers.

By default, Timestamps are not included in messages. You need to configure it.

Consumers

Consumers read data from the topics. Similar to producers, consumers connect with a bootstrap server, which share the metadata necessary to consume data.

Consumers can also automatically recover from Kafka broker failures.

Consumers read data from partitions, depending upon the configuration. A consumer can read --from-beginning of a partition or --latest messages.

As Kafka output is in bytes, the messages have to be deserialized. This can be done using IntegerDeserializer, StringDeserializer or Avro etc. If compression was used, decompression is needed.

Consumer Groups

Consumers can be part of a group. Consumer groups is a group of consumers that all want to do the same operation to the data. By using consumer groups and multiple partitions, messages can be processed in parallel.

Consumers within the same group cannot read data from the same partitions. Within a group, each consumer reads data from a partition, that no other consumer within the group is listening to.

In case there are more consumers than partitions, one of the consumers will be idle.

You can create as many consumer groups as needed. A new consumer group will be able to read data from the start.

Note: Incase the number of consumers within a group change, a rebalance can happen. That means if a consumer goes down some other consumer (within the group) will be assigned to listen to that particular partition and when it comes back the partition assignments can change again.

For more details: Kafka Consumer Group Rebalance

Consumer Offsets

Kafka stores the offset at which a consumer or a consumer group has been reading. These offsets are in a Kafka topic named __consumer_offsets. This allows a consumers to resume reading from where it left, in case it goes down.

Internally Kafka calls commitAsync, to commit the offsets of a particular partition. Offsets are committed when you poll the messages (poll() is called) and the configuration “commit.interval.ms” has elapsed. So make sure your message has been successfully processed before calling poll again.

You can commit the offsets manually as well (not recommended), for that you need to set “enable.auto.commit=false.” For manual commit you can use the commitSync and commitAsync methods.

Note: Sometimes your consumer may fail to process the message, in that case you can retry 2–3 times and if the issue still remains, move the message to some dead letter queue so you can diagnose the issue according to your requirements.

Kafka Brokers

A Kafka cluster is composed of many Kafka brokers (servers). Each broker has a unique identifier and operates as a bootstrap server. This implies that you only need to establish a connection with a single broker. Since each broker possesses metadata of the entire cluster, the client can effectively connect with the entire cluster.

Each broker can have one or many partitions of a particular topic. A broker can have multiple topics. Usually a topic partitions are spread among different brokers, this helps with scaling as data is spread in the cluster.

You can create as many brokers as necessary, according to your requirements.

Topic Replication

Data can be distributed within the cluster to provide protection against broker failures. For this purpose topics should be replicated in different brokers, so that when a broker goes down consumers are able to read the data from a different broker.

A replication factor of 2 would mean the system can handle the loss of 1 broker.

As depicted above, in the event of losing a broker, the consumer can still access and read the data. Likewise, the producer will write to the available broker.

As a rule, for a replication factor of N, you can withstand losing up-to N-1 brokers permanently and still recover your data.

For mission critical data a replication factor of 3 is recommended.

Partition Leader

As there can be multiple replications of the same partition, the producer cannot write in all of the partitions. That is why a partition leader is needed. Producer writes data to only the partition leader and Kafka maintains the in sync replicas (ISR). Similarly consumer only reads the data from partition leader.

In newer versions, Kafka allows consumer to read data from the closest replica.

The process of leader election can influence performance and scalability, but discussing its intricacies is beyond the scope of this article due to its complexity.

For more details : Using Kafka-Leader-Election to improve Scalability and Performance

Producer Acknowledgments

Producers can choose to receive acknowledgment from the Kafka brokers. This helps in preventing data loss, incase there was a write failure.

Kafka provides three different acknowledgment configurations

ack = 0, No Acknowledgment (can result is data loss)

ack = 1, acknowledgment from partition Leader only (it is possible that replication fails, so there is a possibility of limited data loss)

ack=all, acknowledgment from Leader + minimum number of replicas specified by the configuration min.insync.replicas (default is 1). If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).

You will need to choose the acknowledgment strategy based on your requirements as it can have an impact on performance.

Zookeeper

Zookeeper is essentially a key-value store which is used to provide configuration, synchronisation and naming registry service for distributed systems.

Kafka uses zookeeper to keep track of all the brokers within the cluster and the cluster topology. It helps perform broker leader elections, and notifies Kafka, incase of any changes like when a broker enters or leave the cluster. It also keeps track of topics that are created or deleted and maintains the topic list.

Starting Kafka

Installing and starting Kafka depends on different OS being used. So we are just going to look at how this can be done using Docker (requires docker installation).

Create a YAML file named ‘start_kafka.yml’ and copy the following configuration in the file.

version: '3.7'
services:
zookeeper1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper1
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVERS: zookeeper1:2888:3888

kafka-broker-1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka-broker-1
container_name: broker1
ports:
- "19092:19092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:9092,LISTENER_LOCAL://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
depends_on:
- zookeeper1

kafka-broker-2:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka-broker-2
container_name: broker2
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
depends_on:
- zookeeper1


kafka-broker-3:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka-broker-3
container_name: broker3
ports:
- "39092:39092"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-3:9092,LISTENER_LOCAL://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
depends_on:
- zookeeper1

Now in the terminal run ‘docker compose -f start_kafka.yml up -d’

It will pull the image from docker hub and start the necessary container. After completion terminal should look like the following image.

You can also confirm this from the docker desktop

As Kafka depends on zookeeper, we are using one instance of zookeeper and to play with Kafka three brokers have been initialised.

Kafka CLI

Topics

You can use the following commands to create, list, describe and delete the topics.

#create topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic first_topic --create

#create topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic second_topic --create --partitions 3

#create topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic third_topic --create --partitions 3 --replication-factor 1

#list topics
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --list

#describe first_topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic first_topic --describe

#delete first_topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic first_topic --delete

These command will manipulate kafka topics in the first broker. To change these commands for other brokers replace broker1 with other container names mentioned in the yml configuration above.

Producer

You can use the following commands to produce data to particular topics.

docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic second_topic

The above command uses kafka console producer to start writing data on second_topic. As we have not used any key, data will be written on partitions using the round robin algorithm.

After execution of this command an interactive session will start on your terminal and anything you write will be written on second_topic.

#write to a topic that does not exist
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic fourth_topic

#producer will expect an ack from min ISRs according to configuration
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic second_topic --producer-property acks=all

#producing with keys, : is key and value separator
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic second_topic --property parse.key=true --property key.separator=:

When you run the first command, it will create the fourth_topic with default configurations and then write data to it. And after running the third command, Kafka will expect a key in the message and parse the message by a key separator.

Note: If you don’t pass the key Kafka will throw an exception: org.apache.kafka.common.KafkaException: No key separator found

Consumer

You can use the following commands to consume data from particular topics.

docker exec --interactive --tty broker1 kafka-console-consumer --bootstrap-server broker1:9092  --topic second_topic --from-beginning --group second_topic_group

The above command starts a consumer as part of second_topic_group and reads data from beginning of second topic.

As second_topic has 3 partitions and we only have one consumer in the group, it reads data from all partitions. You can also observe that this data is not ordered. The order can be observed when you have three consumers, as each will be reading data from its own partition of second_topic.

You can read the message with details like timestamp, partition and key using the following command:

docker exec --interactive --tty broker1 kafka-console-consumer --bootstrap-server broker1:9092 --topic second_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning
Kafka message with details

You can read more about consumer groups from Kafka Documentation (Managing Consumer Groups).

Note: Kafka can be easily used with Java/Kotlin and some other languages as well. For more details: Available Kafka Client APIs

Conclusion

In conclusion, we need a sophisticated message broker to simplify our architecture and perform distributed transactions. With some basic Kafka knowledge, you can get started quickly and see how Kafka can help solve your specific problems.

Good to Read

References

Udemy: Apache Kafka Series by Stephane Maarek

Confluent: Kafka Tutorials

Kafka Documentation

Kafka The Definitive Guide

Images: For some images, I’m not aware of original creators. Please share if you know.

--

--

Ali Tahir
Ali Tahir

Responses (1)