How to Make Kafka Producer/Consumer Production-Ready

Shivanshu Goyal
The Startup
Published in
6 min readSep 19, 2020

Kafka is an open-source stream processing platform. It is developed to provide high throughput and low latency to handle real-time data.

Before we read about how to make our Kafka producer/consumer production-ready, Let’s first understand the basic terminologies of Kafka.

Kafka topic: It refers to a family or name to store a specific kind of message or a particular stream of data. A topic contains messages of a similar kind.

Kafka partition: A topic can be partitioned into one or more partitions. It is again to segregate the messages from one topic into multiple buckets. Each partition may reside on the same/different Kafka brokers.

Kafka producer: It is a component of the Kafka ecosystem which is used to publish messages onto a Kafka topic.

Kafka consumer: It is another component of the Kafka ecosystem which consumes messages produced by a Kafka producer on a topic.

Producer publishes messages onto topic A. Topic A has 3 partitions, Each partition is read by a consumer

Kafka producer configurations

Having knowledge of these producer configurations becomes critical for us in order to get optimal performance and to leverage the capabilities of Kafka in your streaming application in the best possible way. There are multiple configurations. Let's look at each of them in detail below:

  1. Retries: This configuration is used to set the maximum number of retry attempts to be done by the producer in case of any message publish failure. If your application can not afford to miss any data publish, we increase this count to guarantee to publish our data.
  2. Batch size: It is set to have high throughput in our producer application by combining multiple network calls into one. batch.size measures batch size in total bytes, how many bytes to be collected before sending to Kafka broker. It makes sense to increase its value when your producer is publishing data all the time to have the best throughput otherwise, it will have to wait till the given size is collected. The default value is 16384.
  3. Linger time: This configuration collaborates with batch size to have high throughput. linger.mssets the maximum time to buffer data in asynchronous mode. Suppose we set 300kb in batch.size and 10 ms in linger.ms, then the producer waits till at least one of them is breached. By default, the producer does not wait. It sends the buffer any time data is available. This would reduce the number of requests sent, but would add up to n milliseconds of latency to records sent.
  4. Send() method: There are 3 ways to publish messages onto a Kafka topic.
    A. Fire and forget — It is the fastest way to publish messages, but messages could be lost here.
    RecordMetadata rm = producer.send(record);
    B. Synchronous — It is the slowest method, we use it when we can not afford to lose any message. It blocks the thread to produce a new message until an acknowledgment is received for the last published message. RecordMetadata rm = producer.send(record).get();
    C. Asynchronous — It gives us better throughput comparing to synchronous since we don’t wait for acknowledgments. producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
    })
  5. Acks: It is used to set the number of replicas for which the coordinator node will wait before sending a successful acknowledgment to the producer application. There are 3 values possible for this configuration — 0,1 and -1(all). 0 means no acknowledgment is required. It is set when low latency is required. -1 (or all) means response from all the replicas for the given partition is inevitable and it is set when data consistency is most crucial.
    There is one more important property min.insync.replicasthat works with acks=all property. For any publish request with acks=all to execute, there should be at least these many in-sync replicas online, otherwise the producer will get exceptions.

Kafka consumer configuration

Let’s understand how to tune our Kafka consumer to consume messages from Kafka efficiently. Like a Kafka producer, a consumer also has a number of configurations.

  1. Consumer group: It plays a significant role for the consumer as you can have multiple instances of the consumer application within the same consumer group to consume messages from a Kafka topic. The number of consumers should be less than or equal to the number of partitions in the Kafka topic as a partition can be assigned to only one consumer in a consumer group.
    Best performance is achieved when #consumers = #partitions
  2. Auto vs Manual commit: A consumer has to commit the offset after consuming a message from a topic. This commit can be made automatically or manually.
    enable.auto.commit is the property which needs to be true to enable auto-commit. It is enabled by default.
    auto.commit.interval.ms is the dependent property which is taken into account when auto-commit is enabled. It is used to set the max time in ms to wait to make a commit for the consumed messages.
    There could be a situation where messages are read, but not processed yet and the application crashes for a reason, then the last read messages will be lost when we restore the application. Manual commit comes into the picture to prevent this situation where the consumer makes commit once it processes the messages completely.
  3. Heartbeat interval: This property is used by the consumers to send their heartbeat to the Kafka broker. It tells Kafka that the given consumer is still alive and consuming messages from it.
    heartbeat.interval.ms = 10ms the consumer sends its heartbeat to the Kafka broker at every 10 milliseconds.
  4. Session timeout: It is the time when the broker decides that the consumer is died and no longer available to consume.
    session.timeout.ms = 50 ms Suppose the consumer is down and it is not sending heartbeat to Kafka broker, then the broker waits until 50 ms time is over, to decide the consumer is dead.
  5. Auto offset reset strategy: It is used to define the behavior of the consumer when there is no committed offset. This would be the case when the consumer is started for the first time or when an offset is out of range. There are 3 possible values for the property auto.offset.reset = earliest, latest, none . The default value is the latest.
    Note: This configuration is applicable only when no offset commit is found.
The auto offset reset strategy is explained how does it work when offset commit is not found for a consumer.

One more important thing that affects what offset value will correspond to the earliest and latest configs is the log retention policy. Imagine we have a topic with retention configured to 1 hour. We produce 10 messages, and then an hour later we publish 5 more messages. The latest offset will still remain the same as explained above, but the earliest one won’t be able to be 0 because Kafka will have already removed these messages and thus the earliest available offset will be 10.

There is an important exception to know about in Kafka consumers. This exception(described below) kicks out the impacted consumers from the consumer group. It generally means that the consumer took too long to process the records returned by the poll().

Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

This exception could be inhibited using the following Kafka consumer configurations:

  1. The maximum number of records to be returned in a single to poll().
    max.poll.records = 500 The default value is 500. Decrease its value to how many records can be processed by the consumer. This configuration works with max.poll.interval.ms.
  2. The maximum poll interval is the max time given by Kafka to the consumer to process the number of records(defined by max.poll.records) The default value is 5 minutes. It can be increased to inhibit the exception.

Thanks for reading!

--

--

Shivanshu Goyal
The Startup

Software Engineer @Salesforce, USA | Ex-Walmart | Ex-Motorola | Ex-Comviva | Ex-Samsung | IIT Dhanbad