Solution for Kafka CommitFailedException

Mouli Shanmuhavelu
3 min readAug 13, 2020

Introduction

In this article, I’ll explain how we resolved the CommitFailedException that was frequently occurring in our Kafka Consumer applications.

we observed many occurrences of this error in our log and same messages are processed again and again which caused the duplicate messages in the target system.

Description

We have Open source apache kafka broker within our On-Premise environment. We have Consumer applications running in both our On-Premise and public cloud environment.

We implemented Kafka consumer applications using Apache Camel and Spring boot. We also do manual commit since we wanted to avoid the offset commit if the target system goes down in mid of processing a batch.

For some of the Kafka topics, we have more than one partitions and equivalent consumer threads. You can find our Kafka Consumer implementation details in :

Exception stack trace

All our Consumer applications had the below error trace in different times.

KafkaConsumer[acme.accounts] [clients.consumer.internals.ConsumerCoordinator(onJoinPrepare:482)] [Consumer clientId=consumer-4, groupId=accounts] User provided listener org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords failed on partition revocation
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1412) ~[kafka-clients-2.3.1.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.commitOffset(KafkaConsumer.java:436) ~[camel-kafka-3.0.0.jar!/:3.0.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.onPartitionsRevoked(KafkaConsumer.java:459) ~[camel-kafka-3.0.0.jar!/:3.0.0]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:478) [kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410) [kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) [kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) [kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) [kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) [kafka-clients-2.3.1.jar!/:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) [kafka-clients-2.3.1.jar!/:?]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:307) [camel-kafka-3.0.0.jar!/:3.0.0]
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:218) [camel-kafka-3.0.0.jar!/:3.0.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:1.8.0_171]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_171]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_171]

The implication of this error was Consumer tried to Commit the offset and it failed. Due to this it fetched the same messages again and sent the duplicate messages to our downstream applications.

Possibilities of Commit Failure

  • If there are any network failures, consumers cannot reach out to broker and will throw this exception. But there were no network failures when these exceptions occurred.
  • As mentioned in the error trace, if too much time is spent on processing the message, the ConsumerCoordinator will lose the connection and the commit will fail. So we analyzed this possibility and found that the below configurations will have impact on polling.

The values given here are the default kafka consumer configuration values.

request.timeout.ms=40000
heartbeat.interval.ms=3000
max.poll.interval.ms=300000
max.poll.records=500
session.timeout.ms=10000

Solution

We just reduced the max.poll.records to 100 but still the exception was occurring some times. So we changed the configurations as below;

request.timeout.ms=300000
heartbeat.interval.ms=1000
max.poll.interval.ms=900000
max.poll.records=100
session.timeout.ms=600000

We reduced the heartbeat interval so that broker will be updated frequently that the Consumer is active. And also increased the session timeout configurations.
After deploying our consumers with these configurations we do not see the error anymore.

Summary

You do not need to configure the same values in your consumer applications.

But reducing the max poll records is not solving the error, you can try with the other configurations as well.

--

--