Before understanding what is Kafka and how it helps us to solve certain problems, lets dive a bit into some of the modern application problems and why using Kafka makes sense.
Problem
To meet the demand of an integrated and seamless world, modern applications have to communicate with many different applications. Due to the need for communication between different systems multiple integrations have to be developed, resulting in a spaghetti architecture.
As depicted from the image, it is very complex and difficult to maintain these apps. The complexity increases even further, when you have to consider distributed transactions and the challenges of microservices architecture.
This is where Kafka comes in. It helps us to decouple our applications/services so they are more manageable and the architecture is cleaner and understandable.
Real World Use Cases
Kafka finds application across various industries, playing a crucial role in addressing technical challenges.
For instance, in the logistics sector, Motive (formerly KeepTruckin) utilizes Kafka for effective fleet management. They can track their fleet and display relevant metrics on user-friendly dashboards.
Similarly, in the telecom industry, Etisalat leverages Kafka to facilitate package subscription orders. Furthermore, numerous ecommerce applications employ Kafka for comparable objectives. To explore further on how diverse industries harness Kafka to resolve their specific issues, you can refer to real-world examples and use cases.
Introduction
Kafka is simply an event stream processing platform.
As depicted by the image above, Kafka processes a current of events one by one. These events are published by producers and consumed by subscribers, similar to a queue.
To Kafka these events are nothing but a set of bytes. Kafka only accepts bytes from producers and provides bytes as an output to consumers.
Kafka stores these events durably and provides near real time stream processing. Because of its persistent storage, real time processing speed and ability to scale, Kafka has become a standard to design event driven systems.
Theoretical Concepts
To learn Kafka, you will have to understand some key terminologies:
Topics
A topic is a feed name to which messages are published in Kafka broker (instance). You can create as many topics as needed. A topic can be identified by its name. Topics are immutable, any data once written cannot be changed. As they are immutable, you cannot rename them.
Note: In case you absolutely need to rename the topic, you will have to create a new topic, migrate the data and the delete the old topic.
Partition and Offsets
Each topic is split into partitions. A topic can have as many partitions as you need. All messages are stored in these partitions and messages within these partitions are ordered. Every message in a partition has an incremental id. This id is called offset.
As depicted in the image, each partition has its own offsets. That means offset 4 in partition 0 has different data when compared to offset 4 in partition 1. And order of messages is only guaranteed within the partitions.
To assign data to the same partition, a key has to be provided. Providing the same key, will result in the messages always going to the same partition. If key is not provided, data is assigned randomly (round robin).
Note: In order to assign data to a particular partition, you need to write a custom Partitioner. When writing a partitioner, you can read the key and value and decide which partition the message should go to.
Data is only kept for a limited time (configurable, default is 1 week). If previous messages are deleted, offsets are not reused.
Note: If key is provided, the producer partitions data by using:
hash(key) % number of partitions
If number of partitions are changed in production the data with same key can end up in a different partition. By default Kafka will not attempt to automatically redistribute data according to new calculations.
If you wish to change the number of partitions (say from 3 to 4) for a topic (let’s call it
topic.v1) , you can create a new topic (e.g.topic.v2) with the new number of partitions and migrate messages fromtopic.v1totopic.v2Kafka does not currently support reducing the number of partitions for a topic. If it’s absolutely necessary for your use case, you can follow the above migration approach.
Producers
Producers write data to Kafka topics. Producers initially connect with a bootstrap server, which provides the metadata necessary to establish connection with the cluster. Once the producer is connected to cluster, it can send messages to specific topics and its partitions.
In case a Kafka broker fails, producers can recover automatically.
In order to get message ordering, producers must provide a Key. If a key is provided all the messages for that key will be written in the same partition. If a key is not provided data can be written in any partition (round robin).
A Kafka producer sends the data in the following format:
Message Serialization — As indicated by the format both the key and value should be in binary. For this purpose message serialization can be done using serializers like IntegerSerializer, StringSerializer, or Avro etc.
Compression — By default, messages are not compressed. In case you need message compression, you need to define it explicitly. Message compression is done at the producer side, before sending it over the network. Message compression can be used to reduce message size which can reduce latency, storage requirements, bandwidth usage and cost along with various other benefits.
Note: By default, max message size allowed by Kafka is 1MB. As large messages are considered inefficient. This configuration can be changed but its not recommended. You can read more about it How to send Large Messages to Apache Kafka
Also the default batch size is 16KB, any message larger than that will not be batched and it will be sent individually. Batch size configuration can be changed as well, but keep in mind smaller batches can be sent more frequently. More at: Kafka Producer Batching
Headers — Headers can include any contextual information and metadata that can help consumers like content type, correlation id, authorizations or any custom headers.
By default, Timestamps are not included in messages. You need to configure it.
Consumers
Consumers read data from the topics. Similar to producers, consumers connect with a bootstrap server, which share the metadata necessary to consume data.
Consumers can also automatically recover from Kafka broker failures.
Consumers read data from partitions, depending upon the configuration. A consumer can read --from-beginning of a partition or --latest messages.
As Kafka output is in bytes, the messages have to be deserialized. This can be done using IntegerDeserializer, StringDeserializer or Avro etc. If compression was used, decompression is needed.
Consumer Groups
Consumers can be part of a group. Consumer groups is a group of consumers that all want to do the same operation to the data. By using consumer groups and multiple partitions, messages can be processed in parallel.
Consumers within the same group cannot read data from the same partitions. Within a group, each consumer reads data from a partition, that no other consumer within the group is listening to.
In case there are more consumers than partitions, one of the consumers will be idle.
You can create as many consumer groups as needed. A new consumer group will be able to read data from the start.
Note: Incase the number of consumers within a group change, a rebalance can happen. That means if a consumer goes down some other consumer (within the group) will be assigned to listen to that particular partition and when it comes back the partition assignments can change again.
For more details: Kafka Consumer Group Rebalance
Consumer Offsets
Kafka stores the offset at which a consumer or a consumer group has been reading. These offsets are in a Kafka topic named __consumer_offsets. This allows a consumers to resume reading from where it left, in case it goes down.
Internally Kafka calls commitAsync, to commit the offsets of a particular partition. Offsets are committed when you poll the messages (poll() is called) and the configuration “commit.interval.ms” has elapsed. So make sure your message has been successfully processed before calling poll again.
You can commit the offsets manually as well (not recommended), for that you need to set “enable.auto.commit=false.” For manual commit you can use the commitSync and commitAsync methods.
Note: Sometimes your consumer may fail to process the message, in that case you can retry 2–3 times and if the issue still remains, move the message to some dead letter queue so you can diagnose the issue according to your requirements.
Kafka Brokers
A Kafka cluster is composed of many Kafka brokers (servers). Each broker has a unique identifier and operates as a bootstrap server. This implies that you only need to establish a connection with a single broker. Since each broker possesses metadata of the entire cluster, the client can effectively connect with the entire cluster.
Each broker can have one or many partitions of a particular topic. A broker can have multiple topics. Usually a topic partitions are spread among different brokers, this helps with scaling as data is spread in the cluster.
You can create as many brokers as necessary, according to your requirements.
Topic Replication
Data can be distributed within the cluster to provide protection against broker failures. For this purpose topics should be replicated in different brokers, so that when a broker goes down consumers are able to read the data from a different broker.
A replication factor of 2 would mean the system can handle the loss of 1 broker.
As depicted above, in the event of losing a broker, the consumer can still access and read the data. Likewise, the producer will write to the available broker.
As a rule, for a replication factor of N, you can withstand losing up-to N-1 brokers permanently and still recover your data.
For mission critical data a replication factor of 3 is recommended.
Partition Leader
As there can be multiple replications of the same partition, the producer cannot write in all of the partitions. That is why a partition leader is needed. Producer writes data to only the partition leader and Kafka maintains the in sync replicas (ISR). Similarly consumer only reads the data from partition leader.
In newer versions, Kafka allows consumer to read data from the closest replica.
The process of leader election can influence performance and scalability, but discussing its intricacies is beyond the scope of this article due to its complexity.
For more details : Using Kafka-Leader-Election to improve Scalability and Performance
Producer Acknowledgments
Producers can choose to receive acknowledgment from the Kafka brokers. This helps in preventing data loss, incase there was a write failure.
Kafka provides three different acknowledgment configurations
ack = 0, No Acknowledgment (can result is data loss)
ack = 1, acknowledgment from partition Leader only (it is possible that replication fails, so there is a possibility of limited data loss)
ack=all, acknowledgment from Leader + minimum number of replicas specified by the configuration min.insync.replicas (default is 1). If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
You will need to choose the acknowledgment strategy based on your requirements as it can have an impact on performance.
Zookeeper
Zookeeper is essentially a key-value store which is used to provide configuration, synchronisation and naming registry service for distributed systems.
Kafka uses zookeeper to keep track of all the brokers within the cluster and the cluster topology. It helps perform broker leader elections, and notifies Kafka, incase of any changes like when a broker enters or leave the cluster. It also keeps track of topics that are created or deleted and maintains the topic list.
Starting Kafka
Installing and starting Kafka depends on different OS being used. So we are just going to look at how this can be done using Docker (requires docker installation).
Create a YAML file named ‘start_kafka.yml’ and copy the following configuration in the file.
version: '3.7'
services:
zookeeper1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper1
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVERS: zookeeper1:2888:3888
kafka-broker-1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka-broker-1
container_name: broker1
ports:
- "19092:19092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:9092,LISTENER_LOCAL://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
depends_on:
- zookeeper1
kafka-broker-2:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka-broker-2
container_name: broker2
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-2:9092,LISTENER_LOCAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
depends_on:
- zookeeper1
kafka-broker-3:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka-broker-3
container_name: broker3
ports:
- "39092:39092"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-3:9092,LISTENER_LOCAL://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
depends_on:
- zookeeper1Now in the terminal run ‘docker compose -f start_kafka.yml up -d’
It will pull the image from docker hub and start the necessary container. After completion terminal should look like the following image.
You can also confirm this from the docker desktop
As Kafka depends on zookeeper, we are using one instance of zookeeper and to play with Kafka three brokers have been initialised.
Kafka CLI
Topics
You can use the following commands to create, list, describe and delete the topics.
#create topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic first_topic --create
#create topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic second_topic --create --partitions 3
#create topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic third_topic --create --partitions 3 --replication-factor 1
#list topics
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --list
#describe first_topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic first_topic --describe
#delete first_topic
docker exec broker1 kafka-topics --bootstrap-server broker1:9092 --topic first_topic --deleteThese command will manipulate kafka topics in the first broker. To change these commands for other brokers replace broker1 with other container names mentioned in the yml configuration above.
Producer
You can use the following commands to produce data to particular topics.
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic second_topicThe above command uses kafka console producer to start writing data on second_topic. As we have not used any key, data will be written on partitions using the round robin algorithm.
After execution of this command an interactive session will start on your terminal and anything you write will be written on second_topic.
#write to a topic that does not exist
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic fourth_topic
#producer will expect an ack from min ISRs according to configuration
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic second_topic --producer-property acks=all
#producing with keys, : is key and value separator
docker exec --interactive --tty broker1 kafka-console-producer --bootstrap-server broker1:9092 --topic second_topic --property parse.key=true --property key.separator=:When you run the first command, it will create the fourth_topic with default configurations and then write data to it. And after running the third command, Kafka will expect a key in the message and parse the message by a key separator.
Note: If you don’t pass the key Kafka will throw an exception: org.apache.kafka.common.KafkaException: No key separator found
Consumer
You can use the following commands to consume data from particular topics.
docker exec --interactive --tty broker1 kafka-console-consumer --bootstrap-server broker1:9092 --topic second_topic --from-beginning --group second_topic_groupThe above command starts a consumer as part of second_topic_group and reads data from beginning of second topic.
As second_topic has 3 partitions and we only have one consumer in the group, it reads data from all partitions. You can also observe that this data is not ordered. The order can be observed when you have three consumers, as each will be reading data from its own partition of second_topic.
You can read the message with details like timestamp, partition and key using the following command:
docker exec --interactive --tty broker1 kafka-console-consumer --bootstrap-server broker1:9092 --topic second_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginningYou can read more about consumer groups from Kafka Documentation (Managing Consumer Groups).
Note: Kafka can be easily used with Java/Kotlin and some other languages as well. For more details: Available Kafka Client APIs
Conclusion
In conclusion, we need a sophisticated message broker to simplify our architecture and perform distributed transactions. With some basic Kafka knowledge, you can get started quickly and see how Kafka can help solve your specific problems.
Good to Read
References
Udemy: Apache Kafka Series by Stephane Maarek
Images: For some images, I’m not aware of original creators. Please share if you know.
