Handling Kafka Partition Rebalancing
Through Reactor-Kafka:
Introduction:
This is a follow-up article from: https://medium.com/thinkspecial/power-of-reactor-kafka-5df7cdef2bdd
The main intention of this article is to explain how to handle the partition rebalance when there are more consumers keep getting added during runtime using reactor-kafka library.
Lets take the below use-case:
Imagine having a topic created with 10 partitions. Following that, when a Consumer A is started, it will be listing to the events over the stream from all 10 partitions. After few minutes, start the new Consumer B subscribed to the same topic. Now, out of 10 partitions, 5 will be distributed to Consumer A and rest of the 5 will be listening on Consumer B. reactor-kafka is specialized polling the events and pre-fetching them and handing over to the consumers. By the time partitions are rebalanced across consumers thousands of events were already pre-fetched and in the queue at the consumer for processing. After the processing finished, While you are trying to acknowledge or commit for all those events from the partitions assigned to a new consumer will fail.
Problem 1: reactor-kafka will be triggering the Flux error in such cases causing the terminal signals for the reactive streams. This means that Flux is kind of dead and will not emit any more messages from these partitions.
Problem 2: We should discard the pre-fetched events which are not relevant to the consumer that are not assigned with the partitions anymore.
Solution:
🐎 Example Java Application:
Lets start with Kafka Download:
Expand to a folder upon double-clicking the download and then go to that folder on a terminal:
👉 Start the Zookeeper at first:
./bin/zookeeper-server-start.sh config/zookeeper.properties
You will see the below message on a successful start:
[2019-05-27 11:21:15,218] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
👉 Now start the Kafka:
./bin/kafka-server-start.sh config/server.properties
You will see a successful start as below:
[2019-05-27 11:23:47,576] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-27 11:23:47,577] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
👉Now Create a Topic:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 10 --topic rebalanceTest
Note that the partition count is 10.
Verify the created topics:
kafka_2.12-2.2.0 $
bin/kafka-topics.sh --list --bootstrap-server localhost:9092rebalanceTest
test
Time to Produce:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic rebalanceTest
Use your keyboard skills to enter random text or numbers…Keep typing and press Enter. Don’t break the keyboard even though you can! :-) CTRL+C when you want to exit.
Final Step: The Consumer: A lot to understand here. Let’s try a simple one.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rebalanceTest --from-beginning
You will be seeing whatever you was typed and typing from producer.
A Sample Consumer:
Starting the consumers will result the below log. Example code from github from the previous article will be self explainable to start the consumers that produces the below log.
The above log explains for every minute a new consumer is joining the group. Multiple Consumers A,B,C,D are assigned with different partitions. So all the prefetched events for a partition should be discarded when that partition is migrated from the consumer. Since, Reactor-Kakfa will provide the hooks for Partitions Assigned and Revoked you can cross check the same before you consume a message.
As stated, starting a new consumer every minute causes the rebalance trigger.
The above consumer log explains as it will drop the messages that are not relevant to the partitions it is assigned to and will proceed further. Another Consumer who ever assigned to those partitions will resume from where the older one started dropping.
The whole above story runs on a manual acknowledgment and auto commits scheduled to run on a periodic time interval.
Using the above Consumer, Rebalance will be taken care automatically from reactor-kafka.
Find the publication here: https://medium.com/thinkspecial