Kafka Programming : Different ways to commit offsets
Kafka has one beautiful feature (besides many other of course) of keeping track of each consumer’s position, and it achieves it fantastically via Offsets. Intent of this article is go through below different ways of committing offsets in kafka.
- Auto commit
- Manual synchronous commit
- Manual asynchronous commit
- Manual commit for specific offsets
What is kafka offset?:
A kafka offset is a unique identifier for each message within a kafka partition. It helps consumers keep track of their progress like how many messages each consumer has already consumed from a partition so far and where to start next from. Please note, offset is unique only within a partition, not across partitions.
Above is a sample pictorial depiction of a kafka topic with two partitions where 0 to 5 numbers are offsets for every message within each partition. Last committed message offset for the consumer for partition 0 is 4, while for partition 1 it is 3. This way if the consumer crashes or gets replaced by a new one, the latest committed message offset will help it resume from the exact point where it was left.
A consumer can either chose to automatically commit offsets periodically or chose to commit it manually for special use cases.
1. Auto commit:
This is the simplest way to commit offsets by just setting enable.auto.commit property to true. In this case, kafka consumer client will auto commit the largest offset returned by the poll() method every 5 seconds. We can set auto.commit.interval.ms property to change this default 5 seconds interval. Sample code snippet below with auto commit enabled with 1 second interval.
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));
while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, AvroMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
}
}
}
Caution with auto commit : With auto commit enabled, kafka consumer client will always commit the last offset returned by the poll method even if they were not processed. For example, if poll returned messages with offsets 0 to 1000, and the consumer could process only up to 500 of them and crashed after auto commit interval. Next time when it resumes, it will see last commit offset as 1000, and will start from 1001. This way it ended up losing message offsets from 501 till 1000. Hence with auto commit, it is critical to make sure we process all offsets returned by the last poll method before calling it again. Sometimes auto commit could also lead to duplicate processing of messages in case consumer crashes before the next auto commit interval. Hence kafka consumer provides APIs for developers to take control in their hand when to commit offsets rather than relying on auto commit by setting enable.auto.commit to false which we will discuss next.
2. Manual synchronous commit:
Auto commit is set to false in the below code snippet and we explicitly call consumer.commitSync() which will commit last offset returned by the poll method. Hence we need to make sure we call it after we are done processing all the message offsets returned by the last poll method.
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));
while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, AvroMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
}
try{
consumer.commitSync();
}catch (CommitFailedException e){
System.out.println("Commit failed due to : "+ e);
e.printStackTrace();
}
One downside of this synchronous commit is that it may have an impact on the application throughput as the application gets blocked until the broker responds to the commit request. Hence we have another option of using asynchronous API mentioned below.
3. Manual asynchronous commit:
With asynchronous commit API, we just send the commit request and carry on. Here the application is not blocked due to asynchronous call nature.
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));
while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, AvroMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
}
consumer.commitAsync();
}
}
One more way asynchronous commit differs from synchronous commit is that synchronous keep on retrying as long as there is no fatal error, while asynchronous does not retry even if it fails otherwise it could lead to duplicate processing. In case of failures, asynchronous relies on the next commit to cover for it.
Combining both synchronous and asynchronous commit:
Usually its a good programming practice to leverage both synchronous and asynchronous commits, sample code snippet below. Here we use commitAsync() throughout processing inside the while loop. And we use commitSync() before we close the consumer to make sure last offset is always committed.
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));
try{
while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, AvroMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
}
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
consumer.commitSync();
}finally {
consumer.close();
}
}
}
4. Manual commit for specific offsets:
All the methods we have discussed so far, they commit the latest offset returned by the last poll() method. What if a developer wants to take further control over it and commit more frequently in smaller batches than what poll returned. For such use case we can pass a map of partition to offset metadata to both commitSync() and commitAsync() APIs. Sample code snippet is below where we commit after processing every message.
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myavrotopic"));
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, AvroMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
offsetAndMetadataMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
consumer.commitSync(offsetAndMetadataMap);
}
}
}
Also checkout : https://medium.com/@rramiz.rraza
I appreciate you and the time you took out of your day to read this! Please watch out (follow & subscribe) for more blogs on big data and other latest technologies. Cheers !