Kafka Producer and Consumer

Emre Akın
8 min readNov 26, 2023

I talked about Kafka architecture in my previous article. In this article, I will talk about the issues of producer and consumer with Spring Boot examples.

First of all, to use kafka in your Spring Boot project, you need to add the spring kafka dependency. With this library, you can use the features you need to produce messages to Kafka and consume messages from Kafka.

Now you also need a Kafka cluster. You have multiple options for this.
1- You can download and install the package suitable for your operating system from Apache Kafka’s website.
2- You can create a docker image locally.
3- You can create a cluster via Confluent Cloud.

You have created your cluster, added the dependency to your project, now we can move on to the producer and consumer details. Since our main goal here is to understand the logic of producers and consumers, I will not go into the details of the previous installation phase. Maybe we can discuss the installation issue in detail in another article.

Producer

Producers are client applications that write event messages to topics in the Kafka cluster. Messages are stored in the form of key-values in the partitions of the topics. As I mentioned in the previous article, messages produced with the same key are written to the same partition. Sending keys during production is not mandatory.

To produce messages from your application, you first need to write your producer configs. Some configs that may be needed for producer configuration are as follows.

  • acks: Acks refers to the minimum number of acknowledgments that must come from the broker for the producer to accept a message as sent to the Kafka cluster. It can take the values “all”, “0” and “1”. all -> producer will wait for the leader section to receive confirmation that all followers have committed the message. 1 -> It is enough for the leader partition to write to its own commit log. 0 -> no ack expected.
  • max.in.flight.requests.per.connection: The maximum number of unapproved requests that the client will send in a single connection before blocking. The default value is 5.
  • linger.ms: Represents the delay time before the batch record request is ready to be sent. All records received between request transmissions are brought together in a single request by the producer. linger.ms specifies the upper limit of latency for batch processing. The default value is 0. This means there will be no delay and batches will be sent immediately (even if there is only 1 message in the batch). In some cases, the client may increase linger.ms to reduce the number of requests even under moderate load to increase throughput. Only this way more records will be stored in memory.
  • batch.size: When more than one record is sent to the same partition, the producer tries to bring the records together. In this way, the performance of both the client and the server can be increased. batch.size represents the maximum size (in bytes) of a single batch. Small batch size will make batch processing trivial and reduce efficiency; A batch size that is too large will waste memory as a buffer is usually allocated to wait for extra records.

You can access all configs and details from the confluent document.

The producer needs serializers when producing messages to Kafka. There are various serializers but generally string, json and avro serializer are used. I will talk about the concepts of schema registry and avro in detail in the next article. You can see the usages in the sample spring boot producer application in my Github account.

https://github.com/emreakin/SpringBoot-Kafka-Producer

Consumer

Consumers are client applications that read event messages from topics. You have created your topic, written your producer application, the data is flowing to Kafka, now it is time to write the consumer application. You need to add configs for the consumer as well as for the producer. There are configs that are used in common with producers, as well as configs that are specific to consumers. Some configs that may be needed for consumer configuration are as follows.

  • auto.offset.reset: The auto.offset.reset parameter is used to determine from where the Consumer will read the data when it stands up. If set to “earliest”, the data in the topic is read again from the beginning. If set to “latest” it continues reading from the last offset. If “none”, it means the starting offset will be determined manually.
  • enable.auto.commit: We talked about offsets before. After a consumer reads a message, the offset is advanced by 1 so that another consumer or the same consumer can continue from the next message. The message must be committed to increase the offset by 1. If this parameter is set to true, if the consumer does not fail in any case after reading the message, it is automatically committed. If set to false, manual commit is required with the commitSync() method. Let me give an example of the need for manual commit. Let’s imagine that you have two operations after reading a data from Kafka. The first is to send a record to the database, the second is to send a push notification to the user. The critical thing for you here is to record to the db and not to do this a second time. It is possible whether pn is excreted or not. To do this, commit manually after writing a record to the db. Thus, in case of an error in any step after this, a duplicate record will not occur because the offset has already been advanced. However, if the auto commit mechanism is used, in case of an error while sending the pn, the same message will be processed again since the offset will not progress.
  • fetch.min.bytes: Config used to specify the minimum number of bytes of data to be fetched at a time. Its default value is 1.
  • fetch.max.bytes: Config used to specify the maximum number of bytes of data to be fetched at a time. The default value is 52428800(50MB).
  • max.poll.records: This is the config used to specify how many records will be captured at once. default value is 500.

You can access all configs and details from the confluent document.

https://github.com/emreakin/SpringBoot-Kafka-Consumer

Consumer Group

One of the most important concepts related to consumers is consumer group. I wanted to talk about it in detail because the consumer group logic must be fully established in order to write an efficient consumer application.

A topic generally consists of more than one partition. These partitions are members of parallelism for Kafka consumers. Consumers become part of a consumer group by consuming a partition. There may be more than one consumer group consuming a topic. Each consumer group has a unique id. This id is assigned by users.

Let’s assume you have a topic named “user” and this topic has 5 partitions. You wrote an application named Consumer1 and gave the consumer group id “userConsumerGroup” that will consume the user topic. When you deploy the application as 3 instances, Kafka’s GroupCoordinator and ConsumerCoordinator assign 3 instances of the Consumer1 application to 3 partitions of the user topic. Then, you wrote a new application named Consumer2 and gave its group id the same as “userConsumerGroup”. When you deploy this application as 3 instances, the coordinators rebalance the consumer group and assign 5 of the 6 instances in total, to the 5 partitions of the user topic. 1 instance remains idle and does not perform any consumption operations. If one of the active instances goes down, a rebalance occurs again and the idle instance is included in the game and assigned to a partition. I will talk about what rebalance is, how it occurs, and what settings are made in another article. I must also say that the example I gave is not a very realistic example. Normally, two different applications are not included in the same consumer group. If messages in the same topic will be processed for a different purpose, a different application and a different consumer group are assigned.

Let’s continue with a single application. The “user” topic has 3 partitions and the Consumer1 application has 3 instances. Each instance consumes a partition and continues on its way. But your load increased and from 1000 registrations per month to the topic, 1M registrations started to come per day. Messages have started to accumulate in your partitions and your application is having difficulty consuming them. The first thing to do here is to increase the number of partitions in the topic and distribute the messages coming to Kafka. You increased the partition number to 12 and messages started to be distributed across partitions. However, since there are 3 instances in the application, there are 3 active members of the consumer group and they can consume a maximum of 3 partitions at time t. Thus, messages continue to accumulate in 9 partitions. There are two methods that can be preferred here. The first is to increase the number of instances of the application to 12. Thus, each instance will be paired with a partition, while 3 messages will be consumed at time t, 12 will be consumed and the speed will theoretically quadruple. The other method is to increase concurrency. Default concurency is 1. If you trust the source of each instance, you can increase the concurrency and make an instance consume more than one partition. You can think of concurrency as a thread. In this scenario, if you set the concurrency of 3 instances to 4, you can consume a total of 12 partitions in parallel. If you want all partitions to be consumed at the same time, you can calculate partition = instance*concurrency at maximum. Additionally, I would like to point out that Kafka can stream millions of data per second. But the ability to consume and process it is directly proportional to your application and resources. So Kafka says, I can transfer large amounts of data to you, but do you have the capacity to process it?

I would like to talk about one last case regarding consumer groups.People usually ask questions like “I launched the test application in my local, the offset of the messages is progressing, but I do not consume it.” Here’s why. Your application in test runs in 2 instances and the topic it consumes has 2 partitions. If you start the application locally with the same consumer group id, you cannot consume because there is no free partition. The offset progresses because the application in the test environment consumes. As a solution, if you change the consumer group id while running the application locally, the topic-instance distribution will be made in a different group, independent of the test environment. You will be able to consume it too. But there is a point you should pay attention to here. If after consuming, you can save to db, send e-mail, update balance, etc. If you are making transactions, these will be duplicates. I recommend adding a local check for such cases.

--

--