Understanding the ‘enable.auto.commit’ Kafka Consumer property

Kafka Consumers read messages from a Kafka topic, its not a hard concept to get your head around. But behind the scenes there’s a lot more going on than meets the eye.

Say we’re consuming messages from a Topic and our Consumer crashes. Once we realise that the world isn't ending, we recover from the crash and we start consuming again. We start receiving messages exactly where we left off from, its kinda neat.

There’s two reasons as to why this happens. One is something referred to as the “Offset” and the other is a couple of default Consumer values.

So whats an Offset?

The Offset is a piece of metadata, an integer value that continually increases for each message that is received in a partition. Each message will have a unique Offset value in a partition.

I use Keys in some of my projects, some of them I don’t ;)

So as you can see here, each message has a unique Offset, and that Offset represents the position of that message in that particular partition.

When a Consumer reads the messages from the Partition it lets Kafka know the Offset of the last consumed message. This Offset is stored in a Topic named _consumer_offsets, in doing this a consumer can stop and restart without forgetting which messages it has consumed.

When we create our Consumers, they have a set of default properties which we can override or we can just leave the default values in effect.

There are two properties that are driving this behaviour.

enable.auto.commit
auto.commit.interval.ms

The first property enable.auto.commit has a default value of true and the second property auto.commit.interval.ms has a default value of 5000. These values are correct for Blizzards node-rdkafka client and the Java KafkaConsumer client but other libraries may differ.

So by default every 5 seconds a Consumer is going to commit its Offset to Kafka or every time data is fetched from the specified Topic it will commit the latest Offset.

Now in some scenarios this is the ideal behaviour but on other scenarios its not.

Say our Consumer is processing a message with an Offset of 100 and whilst processing it the Consumer fetches some more data, the Offset is commit and then the Consumer crashes. Upon coming back up it will start consuming messages from the most recent committed Offset, but how can we safely say that we haven’t lost messages and the Offset of the new message isn't later then the one of the message been processed?

What we can do is commit the Offset of messages manually after processing them. This give us full control over when we consider a message dealt with, processed and ready to let Kafka know that.

Firstly we have to change the value of the enable.auto.commit property.

enable.auto.commit: false

When we change this property the auto.commit.interval.ms value isnt taken into consideration.

So now we can commit our Offset manually after the processing has taken place and if the Consumer crashes whilst processing a message it will start consuming from that same Offset, no messages lost.

Both the clients mentioned earlier in this article have methods exposed to commit the Offset.

For furher reading on the clients check out the links below.

If anyone wants any more information on Kafka or Consumers get in touch on Twitter.

Cheers,

Danny

https://twitter.com/danieljameskay