Apache Kafka Guide #15 Java API Consumer Group

Paul Ravvich
Apache Kafka At the Gates of Mastery
3 min readJan 18, 2024

Hi, this is Paul, and welcome to the #15 part of my Apache Kafka guide. Today we will discuss creating Java Consumer Groups.

In this session, we will examine the mechanisms of partition rebalancing. In the following lecture, we will delve deeper into these rebalancing processes. Presently, let’s focus on a consumer that reads from a topic with three partitions. We will observe this with one consumer initially, followed by two, and then three consumers in our group.

To understand and practically see how it works we have to run multiple instances of this code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ShortDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
public static void main(String[] args) {
String groupId = "group1";
String topic = "demo_java";

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("key.deserializer", ShortDeserializer.class.getName());
properties.setProperty("value.deserializer", ShortDeserializer.class.getName());

properties.setProperty("group.id", groupId);
properties.setProperty("enable.offset.reset", "earliest");

try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(topic));

while (true) {
Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Detected shutdown");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}));

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> record : records) {
System.out.println("Key" + record.key() + " Value" + record.value());
System.out.println("Partition" + record.partition() + " Offset" + record.offset());
}
}
} catch (WakeupException e) {
System.out.println("WakeupException" + e);
} catch (Exception e) {
System.out.println("Exception" + e);
}
}
}

This is how you can run multiple instances in IntelliJ IDEA:

  1. Edit configuration
  2. Modify options
  3. Allow multiple instances

Consumer Auto-Rebalancing when another Consumer Clean Shutdown

Run 3 instances and send data with the Producer from prev guide.

I opt to shut down, for instance, by selecting the exits and initiating a clean shutdown. Since it’s a clean shutdown, my other consumers rebalance, evident as the second one receives partition two, and another is assigned partitions zero and one. If I choose to shut down this one again, it undergoes a clean shutdown. Consequently, the remaining one is allocated to all partitions: zero, one, and two.

This is why very important to shut down your consumer clean and gracefully using consumer.wakeup().

Thank you for reading until the end. Before you go:

Paul Ravvich

--

--

Paul Ravvich
Apache Kafka At the Gates of Mastery

Software Engineer with over 10 years of XP. Join me for tips on Programming, System Design, and productivity in tech! New articles every Tuesday and Thursday!