Theory Behind Kafka
Today, we are going to discuss the theories behind Kafka. It is important to know how Kafka works as much as using it. Kafka is often used in real-time streaming data architectures to provide real-time analytics. Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system. So let’s find out how Kafka Works !
Topics, Partitions and Offsets
Topics : a particular stream of data,Similar to a table in database (without the constraints). You can have many topics as you want and the topic is identified by its name.
Topics are split in partitions.
Each partition is ordered and each message within a partition gets an incremental id, called offset. Offsets only have a meaning for a specific partition. For Example, offset 3 in partition 0 doesn't represent the same as offset 3 in partition 1.
Order of offset is guaranteed only within a partition, not across partitions. Each message within a partition gets an incremental id called offset.
You need to specify how many partitions there should be, when creating a topic (you can change this later). Data in Kafka is kept only for a limited amount of time. Data gets deleted over time, By default it is one week.
Once the data is written to a partition, it cannot be changed. (immutability)
Say you have a fleet of trucks, each truck reports its GPS position to Kafka. You can have a topic TRUCK_GPRS that contains the position of all trucks. Each truck will send a message to Kafka every 20 seconds, each message will contain the truck ID and the truck position (latitude and longitude).
We choose to create that topic with 10 partitions (this is an arbitrary number). We have each truck reporting/sending messages of their own position to Kafka, through the same TRUCK_GPRS topic. (you don’t have one topic per truck, trucks are going to send data to the same topic). Then we have consumers for our data. you can have Location dashboard for your
Employees so they can look at the truck data, or maybe you want to have a Notification service , for example when a truck is running out of fuel, or whatever you want.
Brokers
A Kafka cluster is composed of multiple brokers(cluster means that it’s composed of multiple brokers and each broker basically is a server). Each broker is identified with its id (id is going to be a number : integer). Each broker will only contain certain topic partitions. After connecting to any broker, (called bootstrap broker), you will be connected to the entire cluster.
Let’s say we have three brokers broker 101, 102, 103. and we are going to create a topic named Topic_A, and it has three partitions, and Topic_B with two partitions. (there is no relationship between partition number and broker number). Once a topic is created, Kafka will automatically assign the topic and distribute it across all of the brokers. In Topic_B, there is less partitions than the number of brokers so broker 103 will not hold any data from Topic_B
Topic Replication factor
Kafka is a distributed system. We have 3 brokers or 100 brokers, so this is distributed. When there is a distributed system in the big data world, we need to have replication, as if a machine goes down, then the things still work, and replication does that for us. When you create a topic you need to decide on the replication factor. Topics should have a replication factor greater than 1 (usually between 2 and 3). This way if a broker is down, another broker can serve the data. Let’s take an example of Topic-A with 2 partitions and replication factor of 2. Because of replication factor, we need to see two replicas of these partitions somewhere (partition 1 of Topic-A is going to be replicated on broker 102 and partition 1 of Topic -A is also going to be replicated on Broker 103 )
Let’s say that we lose broker 2. Broker 101 and 103 can still serve the data. The replication basically allowed us to ensure that data would not be lost.
Concept of leader for a partition.
At any time only ONE broker can be a leader for a given partition. Only that leader can receive and serve data for a partition. The other brokers will synchronize the data. Therefore each partition has one leader and multiple ISR (in-sync replica)
Let’s take our previous example.
For partition 0 broker 101 is going to be the leader while broker 102 is going to be a replica or ISR. Similarly, for partition 102, broker 102 will be leader and broker 103 will be ISR. Zookeeper (will be discussed bellow) will decide leaders and ISRs. If broker 101 on the left is lost, then the partition 0 on broker 102 will become the leader because it was in-sync replica. When broker 101 comes back, it will try to become leader again after replicating the data. This is happening in the background, handled by Kafka.
Producers
Producer writes data to a topic (which is made of partitions)
Producers automatically know which broker and partition to write to.
In case of broker failure producers will automatically recover.
Let’s say we have a producer, and it is sending data to partition one and two of Topic-A . basically by sending data to Kafka to the topic, producers will load balance , automatically send a little bit to broker one, little bit to broker 2, little bit to broker 3 and then switch on again. This is how load balancing is done in Kafka.
Producers can choose to receive acknowledgement of data writes. There are three acknowledgement modes
Acks=0 : producer just sends the data and will wait for acknowledgement (possible data loss because if a producer sends data to a broker and broker is down, we don’t know about it)
Acks=1 : default mode. Producer will wait for the leader to acknowledge. (limited data loss)
Acks=all : leader and all the replicas acknowledgement (no data loss)
Producers : Message Keys
Producers can choose to send a key with the message (string, number, etc.).
When you send data without a key, the key will be null and data will be send round robin, means first message will be sent to broker 101, second message to 102 and third message to 103, etc etc.
If a key is sent, then all messages for that key will always go to the same partition. A key is basically sent if you need message ordering for a specific field. Let’s take our previous truck example.
If we want each data for each truck to be in order, not across the trucks but for each one truck, I will choose my truck_id to be my key. So as shown in image, truck_id_123 data will always be in partition 0 and so on. We can’t specify that this key goes to this partition, but you know that this key will always go to the same partition.
Consumers
Consumers read data from a topic and the topic is going to be identified by it’s name. Consumers automatically know which broker to read from. In case of broker failures, consumers know how to recover (just like producers). Data in overall will be read in order within each partition.
Say we have broker 101 and topic A partition 0 and we have a consumer reading from it. Its going to read data in order. It will read message zero, or offset zero then offset one , offset two … etc.
Consumers can also read from multiple partitions. Say we have another consumer and its reading the data from partition one of broker 102. It is reading data in order for partition one, but also reading data in order for partition two. As you can see, there is no guarantee across the order between partition on and partition two. It reads them in parallel. (in reality, it will read a little bit from partition one, then little bit from partition two, and from partition one again, etc. )
Consumer Groups
Basically we are gonna have a lot of consumers and a Consumer is like an application and it will read data in groups. Each consumer within a group will read directly from exclusive partitions.
Basically a consumer group represents an application. Let’s say application_1 has two consumers and consumer one is going to read data from 2 partitions and consumer 2 is going to read from third partition. application_2 has three consumers and each consumer will read from one partition only. Say we have a third application with one consumer only. And that one consumer will read from the three partitions at the same time. Consumers will automatically use GroupCoordinator and a ConsumerCoordinator to assign a consumer to a partition
If you have more consumers than partitions, some consumers will be inactive. Say we have 4 consumers and only three can read from partitions. So the 4th consumer is here to be inactive. Sometimes you may want that because for example let’s say that you are gonna lose consumer 3 like the application just stops or the machine shuts down. Then consumer 4 can take over right away. This is the only case where you would want this. But usually you don’t have inactive consumers. That’s why it is important when you choose a number of partitions for your topic.
Consumer offsets
Kafka stores the offsets at which a consumer group has been reading. Think of it like check-pointing or bookmarking. The offsets are committed live (as your consumer groups read data) in the Kafka topic named __consumer_offsets . When a consumer in a group has processed data received from Kafka, it should be committing the offsets. If a consumer dies, it will be able to read back from where it left off thanks to the consumer offsets.
Say there is a topic and has many offsets like 4258, 4259,….4269. after processing a bit, its going to commit offsets as 4262. and then its going to read from this offsets, so even if it went down, Kafka will say that the 4262 is your committed offset, this is where you should start from, and the consumer will ask f0r 4263,4264,…etc
Delivery semantics for consumers
Consumers choose when to commit offsets. There are 3 delivery semantics.
1. At most once : offsets are committed as soon as the message is received. If the processing goes wrong, the message will be lost.(it won’t be reading again) so At most once is usually not preferred
2. At Least once : offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of messages. Make sure your processing is idempotent (i.e. processing again the message won’t impact your system) this is usually the preferred method.
3. Exactly one : can be achieved for Kafka => Kafka workflows using Kafka streams API. You have to look into their documentation specifically.
Say you have Kafka => database, then you need Exactly one because you don’t want to have duplicates into your database. But for this, you have to use an idempotent consumer, so anything that Kafka to external system as exactly once most likely means that its using an idempotent consumer which makes sure that there is no duplicates in the final database.
Kafka Broker Discovery
We have established that producers or consumers can automatically figure out which broker they can send or receive data to or from. Well, every Kafka broker is called a bootstrap server. That means that you only need to connect to one broker only and you will be connected to the entire cluster. Each broker knows about all brokers, all topics and partitions and its called Metadata.
Say we have a Kafka cluster with five brokers. All of them are bootstrap servers. Your Kafka client could be a producer or a consumer, let’s say that it will connect to broker 101. But it could connect to any of them. When the connection is established, your client automatically behind the scenes will do something called a metadata request, and the broker 101 will come back and say , “Hay, Here’s your metadata, including the list of all the brokers and their IPs etc etc.” and the client will get them. When a client starts producing or consuming, it knows which broker it needs to connect automatically. That’s how broker discovery works. When you have a Kafka cluster, even if it has 100 brokers, you only need to connect to one broker, to get connected to the entire thing, and clients are smart enough to figure out afterwards to which broke they should connect to.
Zookeeper
Zookeeper is what holds the brokers together. Zookeeper manages the brokers, it keeps a list of them and zookeeper will also help in performing leader elections for partitions. When a broker goes down, there is a new partition that becomes a leader and zookeeper helps with that. Zookeeper also will send notifications to Kafka in case there is any changes. Eg: New topic, broker dies, broker comes up, delete topics, etc. Kafka can not work without Zookeeper. When you stark Kafka, you first have to start Zookeeper. Zookeeper by design operates with an odd number of servers (3,5,7). Zookeeper has a leader and the leader handles the writes from the brokers. Rest of the servers are followers and they handle reads. (The producers and consumers don’t write to the zookeeper. They write to Kafka and Kafka manages all metadata in zookeeper.)
Kafka Guarantees
Kafka Guarantees that,
- Messages are appended to a topic-partition in the order they are sent
- Consumers read these messages in the order stored in a topic partition.
- With a replication factor of N, producers and consumers can tolerate up to N-1 brokers being down
- That’s why a replication factor of 3 is a good idea. Because one broker can be taken down for maintenance and another broker can be taken down unexpectedly and we still have a working topic
so, that’s it for today. I hope you have enjoyed this and see you again with a new topic (not a Kafka topic 😛). Have a nice Day 😇