Spring Boot with Kafka Integration — Part 1: Kafka Producer

Ankit Thakur
4 min readSep 3, 2018

--

Hi all, in my last story, I had shared about setting up Kafka on Mac.

Kafka is having mainly 2 modules:

  1. Producer (which send messages to the Kafka Server)
  2. Consumer (which recieve messages from the Kafka Server).

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

On Kafka server, there may be multiple producers sending different type of messages to the server, and consumer may want to recieve some specific sort of messages. So to ease it, Kafka is having a channel Topic, over which the messages will be transferred.

Topic: A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

In last link, we had created a topic:

$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Now, we will use this topic and will create Kafka Producer to send message over it. Below are application.yaml and .properties files, and we can use either of them.

Stream Interface:

Now we will create the stream interface, which will use kafkatopic1 for sending the data.

In above Stream interface, we have created static string with the same name, we had given in application.yaml file for binding,i.e. kafkatopic1

@Output : It will take binding value kafkatopic1 as input and will bind the output target.

producerChannel : This stream is required to write messages to the Kafka Topic.

SpringBoot will create a proxy-based implementation of the KafkaServerStreams interface that can be injected as a Spring Bean anywhere in the code to access our stream during run-time.

Configure Stream:

Now we need to configure Spring Cloud Stream to bind to our producer stream. For this, we will use annotation EnableBinding which will take Interface name KafkaServerStreams as input.

Definition of EnableBinding: Enables the binding of targets annotated with {@link Input} and {@link Output} to a broker, according to the list of interfaces passed as value to the annotation.

So, we will create an empty class, and add this annotation to it.

Message Object:

Now we will create model object, which will be sent over as a message in json format.

Kafka Service:

Now we need to create the service class, which will use Kafka stream, where we will inject this model as message.

In above code, as we had defined in application properties, we are setting the content type as application/json in MessageBuilder.

@Service annotation will configure this class as a Spring Bean.

sendMessage method is the one using Kafka Stream.

helloService is the test method to verify the communication between Rest controller and service class.

REST Controller:

Now we will expose REST APIs to consume input, and send it to to the service layer, and then service layer will send the data as stream to Kafka.

In above class, I have created the helloKafka method, which will take random first name and last name of the user, and will create Person object and send it to the service layer.

So now, we will run the code and then will hit:

http://localhost:5060/kafka/message

Console logs — Sending Person object in Service Layer

In above screen, left side is zookeeper, right top is server and bottom is the the consumer, which is recieving the messages:

Here is the command to consume the topic:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

Here is the complete code.

Thanks everyone, hopefully it is helpful.

--

--