Spring Boot with Kafka Integration — Part 2: Kafka Consumer
Hello everyone, in my last story, we have discussed about creating the Kafka Producer with Spring boot.
Now we will discuss about Kafka Consumer. For this, I am creating the new project and copying the model object “Person” from Producer in Consumer.
Now lets see the application.yml file, where we will define the bootstrap server, topic name and group id for consumer. group-id is needed to differentiate the consumers.
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
Now we will create one controller class, which will be used to instantiate the service bean, and we will add one sample REST api as below:
Then the main part comes, that is the service. Service class will be using KafkaListener to listen to the topic, over which the messages will be read.
Here, we can see that we are reading topics from yaml file using SpEL expression. We have used split, so that one method/listener if needed can listener from multiple topics, and groupid will be single string.
Now, the input is the ConsumerRecord object, which contains all important details of the producer, like which partition the response is coming from, what is the offset value of partition, what is the topic name. record.value() is the actual message which we need to parse. The message will be the byte array and we will use `fasterxml.jackson` node to parse it and convert it into model object.
The latest consumer code is available here.
Hopefully it will be helpful.
