Spring Kafka Producer(Part 1): API and Producer Architecture

Prashantprakash
11 min readFeb 17, 2024

--

Spring Kafka

Goal: We will implement notification APIs. When we get hit on these API, based on few conditions, we will produce message into different topics in Kafka. The code to start with can be found here: https://github.com/Geek8080/notification-service/tree/2-dao-module-logging-setup

Prerequisites: I suggest to setup the project on local and follow along. I have explained how to setup Kafka in my previous blog, I will attach link to that in the end of this blog. If you want to implement the producer for your own project, following this blog, if you get stuck anywhere, comment on this, I will surely help.

API

We will expose one simple API, it will get two data, to decide who to send the notification to(Email ID), what should be the content of notification.

Create a module producer as described here:

For this module, set the packaging to war, since this will be deployed as a web service. Add the following plugin to the parent pom:

<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.4.0</version>
</plugin>

In the dependency management for the parent pom, add the spring boot starter web:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.starter.web.version}</version>
</dependency>

Now, let’s move to the child pom, and update it. Add the dependency management for spring boot starter dependencies. Add dependency for Spring boot starter web, to get all basic dependencies for spring boot web application.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>producer</name>
<description>Module for Kafka Producer</description>
<packaging>war</packaging>

<parent>
<groupId>io.Geek8080</groupId>
<artifactId>notification-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>

Run mvn verify to verify that you have a valid maven project. You will see something like this:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for notification-service 0.0.1-SNAPSHOT:
[INFO]
[INFO] notification-service ............................... SUCCESS [ 0.126 s]
[INFO] dao ................................................ SUCCESS [ 2.022 s]
[INFO] producer ........................................... SUCCESS [ 0.859 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.213 s
[INFO] Finished at: 2024-02-06T22:08:03+05:30
[INFO] ------------------------------------------------------------------------

Create Application Class

Let’s write some code and get our spring application running. First create the main class, mark it as @SpringBootApplication. The file will look something like shown below, a typical Spring boot application file.

package io.Geek8080.notification.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Created by Geek8080 on 07-02-2024.
*/
@SpringBootApplication
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}

Now, you can run this by simply running the command below:

 mvn clean compile -pl "producer" spring-boot:run

You will see something like this in output:


. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.2.2)

2024-02-07T22:16:08.940+05:30 INFO 5740 --- [ restartedMain] i.G.n.producer.ProducerApplication : Starting ProducerApplication using Java 17.0.3 with PID 5740 (C:\Users\prash\Documents\Programming\Kafka\notification-service\producer\target\classes started by prash in C:\Users\prash\Documents\Programming\Kafka\notification-service\producer)
2024-02-07T22:16:08.944+05:30 INFO 5740 --- [ restartedMain] i.G.n.producer.ProducerApplication : No active profile set, falling back to 1 default profile: "default"
2024-02-07T22:16:08.989+05:30 INFO 5740 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2024-02-07T22:16:08.989+05:30 INFO 5740 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2024-02-07T22:16:09.884+05:30 INFO 5740 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8080 (http)
2024-02-07T22:16:09.894+05:30 INFO 5740 --- [ restartedMain] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2024-02-07T22:16:09.894+05:30 INFO 5740 --- [ restartedMain] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.18]
2024-02-07T22:16:09.924+05:30 INFO 5740 --- [ restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2024-02-07T22:16:09.925+05:30 INFO 5740 --- [ restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 934 ms
2024-02-07T22:16:10.226+05:30 INFO 5740 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2024-02-07T22:16:10.231+05:30 INFO 5740 --- [ restartedMain] o.s.b.a.e.web.EndpointLinksResolver : Exposing 1 endpoint(s) beneath base path '/actuator'
2024-02-07T22:16:10.267+05:30 INFO 5740 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path ''
2024-02-07T22:16:10.278+05:30 INFO 5740 --- [ restartedMain] i.G.n.producer.ProducerApplication : Started ProducerApplication in 1.549 seconds (process running for 1.785)

Great!! we have a working web app, good job.

War applications are supposed to be run in a separate container(tomcat server). Running war on servlet container requires a web.xml file. Since spring discourage using xml configuration files, we can define these configurations in a class extending SpringBootServletInitializer and overriding the method configure(). Here’s how I do it:

package io.Geek8080.notification.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;

/**
* Created by Geek8080 on 07-02-2024.
*/
@SpringBootApplication
public class ProducerApplication extends SpringBootServletInitializer {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(ProducerApplication.class);
}
}

Run the application at this stage. You won’t see much difference than before. We have just added the capability to define web.xml properties to our existing application.

Code till this point: https://github.com/Geek8080/notification-service/tree/3-1-notification-producer-API

Adding API to our Application

I won’t go into the details of designing APIs. Here’s an amazing tutorial if you need to understand REST APIs, and details on how to build one:

Before we start writing the APIs, let’s first add logging to our application.

Here I have described how to add logging, you can follow that for detailed walkthrough:

Add the log4j2.xml file to our module resources folder, it should look something like this:

<?xml version="1.0" encoding="UTF-8" ?>
<Configuration>
<Properties>
<Property name="logpath-location">app/logs</Property>
<Property name="logfile-name">notification_service_producer.log</Property>
<Property name="archive">${logpath-location}/archive/notificationService</Property>
<Property name="interval">10</Property>
</Properties>

<Appenders>
<Console name="Console">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] %-5p %C.%M():%L %X - %m%n"/>
</Console>

<RollingFile name="RollingFileAppender" fileName="${logpath-location}/${logfile-name}"
filePattern="${archive}/${logfile-name}.%d{yyyy-MM-dd-HH}.gz">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] %-5p %c.%M ():%L %X - %m%n"/>
<Policies>
<TimeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
</Appenders>

<Loggers>
<Logger name="io.Geek8080.notification.dao" level="DEBUG" additivity="false" includeLocation="true">
<AppenderRef ref="Console" level="INFO"/>
<AppenderRef ref="RollingFileAppender" level="DEBUG"/>
</Logger>

<Root level="INFO" includeLocation="true">
<AppenderRef ref="Console" level="INFO"/>
<AppenderRef ref="RollingFileAppender" level="DEBUG"/>
</Root>
</Loggers>

</Configuration>

Add the log4j2.component.properties, it will look like below:

log4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector

Run the application using the command below, and now you will get log in new format.

mvn clean install -pl producer spring-boot:run

Did you notice any difference? No? That’s unexpected, right?

If you will look carefully in the logs, right before the spring logo, you will see this:

SLF4J(W): Class path contains multiple SLF4J providers.
SLF4J(W): Found provider [ch.qos.logback.classic.spi.LogbackServiceProvider@36aa7bc2]
SLF4J(W): Found provider [org.apache.logging.slf4j.SLF4JServiceProvider@76ccd017]
SLF4J(W): See https://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J(I): Actual provider is of type [ch.qos.logback.classic.spi.LogbackServiceProvider@36aa7bc2]

I think you already what this is. We have conflicting logging libraries. Let’s check what dependency is bringing this additional logging library. Run the below command:

mvn dependency:tree

You will get something like this:

[INFO] --- maven-dependency-plugin:3.6.1:tree (default-cli) @ producer ---
[INFO] io.Geek8080:producer:war:0.0.1-SNAPSHOT
[INFO] +- org.springframework.boot:spring-boot-starter-web:jar:3.1.0:compile
[INFO] | +- org.springframework.boot:spring-boot-starter:jar:3.2.2:compile
[INFO] | | +- org.springframework.boot:spring-boot-starter-logging:jar:3.2.2:compile
[INFO] | | | +- ch.qos.logback:logback-classic:jar:1.4.14:compile
[INFO] | | | | \- ch.qos.logback:logback-core:jar:1.4.14:compile
[INFO] | | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.21.1:compile
[INFO] | | | \- org.slf4j:jul-to-slf4j:jar:2.0.11:compile
[INFO] | | +- jakarta.annotation:jakarta.annotation-api:jar:2.1.1:compile
[INFO] | | +- org.springframework:spring-core:jar:6.1.3:compile
[INFO] | | | \- org.springframework:spring-jcl:jar:6.1.3:compile
[INFO] | | \- org.yaml:snakeyaml:jar:2.2:compile
[INFO] | +- org.springframework.boot:spring-boot-starter-json:jar:3.2.2:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-databind:jar:2.15.3:compile
[INFO] | | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.15.3:compile
[INFO] | | | \- com.fasterxml.jackson.core:jackson-core:jar:2.15.3:compile
[INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.15.3:compile
[INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.15.3:compile
[INFO] | | \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.15.3:compile
[INFO] | +- org.springframework.boot:spring-boot-starter-tomcat:jar:3.2.2:compile
[INFO] | | +- org.apache.tomcat.embed:tomcat-embed-core:jar:10.1.18:compile
[INFO] | | +- org.apache.tomcat.embed:tomcat-embed-el:jar:10.1.18:compile
[INFO] | | \- org.apache.tomcat.embed:tomcat-embed-websocket:jar:10.1.18:compile
[INFO] | +- org.springframework:spring-web:jar:6.1.3:compile
[INFO] | | \- org.springframework:spring-beans:jar:6.1.3:compile
[INFO] | \- org.springframework:spring-webmvc:jar:6.1.3:compile
[INFO] | +- org.springframework:spring-aop:jar:6.1.3:compile
[INFO] | +- org.springframework:spring-context:jar:6.1.3:compile
[INFO] | \- org.springframework:spring-expression:jar:6.1.3:compile
[INFO] +- org.springframework.boot:spring-boot-starter-log4j2:jar:3.2.2:compile
[INFO] | +- org.apache.logging.log4j:log4j-slf4j2-impl:jar:2.21.1:compile
[INFO] | | +- org.apache.logging.log4j:log4j-api:jar:2.21.1:compile
[INFO] | | \- org.slf4j:slf4j-api:jar:2.0.11:compile
[INFO] | +- org.apache.logging.log4j:log4j-core:jar:2.21.1:compile
[INFO] | \- org.apache.logging.log4j:log4j-jul:jar:2.21.1:compile
[INFO] +- com.lmax:disruptor:jar:3.4.4:compile
[INFO] +- org.projectlombok:lombok:jar:1.18.28:compile
[INFO] +- org.springframework.boot:spring-boot-starter-actuator:jar:3.2.2:compile
[INFO] | +- org.springframework.boot:spring-boot-actuator-autoconfigure:jar:3.2.2:compile
[INFO] | | \- org.springframework.boot:spring-boot-actuator:jar:3.2.2:compile
[INFO] | +- io.micrometer:micrometer-observation:jar:1.12.2:compile
[INFO] | | \- io.micrometer:micrometer-commons:jar:1.12.2:compile
[INFO] | \- io.micrometer:micrometer-jakarta9:jar:1.12.2:compile
[INFO] | \- io.micrometer:micrometer-core:jar:1.12.2:compile
[INFO] | +- org.hdrhistogram:HdrHistogram:jar:2.1.12:runtime
[INFO] | \- org.latencyutils:LatencyUtils:jar:2.0.3:runtime
[INFO] +- org.springframework.boot:spring-boot-devtools:jar:3.2.2:runtime
[INFO] | +- org.springframework.boot:spring-boot:jar:3.2.2:compile
[INFO] | \- org.springframework.boot:spring-boot-autoconfigure:jar:3.2.2:compile
[INFO] \- org.springframework.boot:spring-boot-configuration-processor:jar:3.2.2:compile

You will see a logback dependency being added as transitive dependency of spring-boot-starter-web.

Add the exclusion for this and run the application again.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

The logs will look like below:

  .   ____          _            __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.2.2)

2024-02-17 09:40:08,182 [restartedMain] INFO org.springframework.boot.StartupInfoLogger.logStarting():50 {} - Starting ProducerApplication using Java 17.0.3 with PID 2144 (C:\Users\prash\Documents\Programming\Kafka\notification-service\producer\target\classes started by prash in C:\Users\prash\Documents\Programming\Kafka\notification-service\producer)
2024-02-17 09:40:08,187 [restartedMain] INFO org.springframework.boot.SpringApplication.logStartupProfileInfo():654 {} - No active profile set, falling back to 1 default profile: "default"
2024-02-17 09:40:08,262 [restartedMain] INFO org.springframework.boot.logging.DeferredLog.logTo():252 {} - Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2024-02-17 09:40:08,263 [restartedMain] INFO org.springframework.boot.logging.DeferredLog.logTo():252 {} - For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2024-02-17 09:40:09,899 [restartedMain] INFO org.springframework.boot.web.embedded.tomcat.TomcatWebServer.initialize():109 {} - Tomcat initialized with port 8080 (http)
2024-02-17 09:40:09,917 [restartedMain] INFO org.apache.juli.logging.DirectJDKLog.log():173 {} - Initializing ProtocolHandler ["http-nio-8080"]
2024-02-17 09:40:09,922 [restartedMain] INFO org.apache.juli.logging.DirectJDKLog.log():173 {} - Starting service [Tomcat]
2024-02-17 09:40:09,923 [restartedMain] INFO org.apache.juli.logging.DirectJDKLog.log():173 {} - Starting Servlet engine: [Apache Tomcat/10.1.18]
2024-02-17 09:40:09,986 [restartedMain] INFO org.apache.juli.logging.DirectJDKLog.log():173 {} - Initializing Spring embedded WebApplicationContext
2024-02-17 09:40:09,989 [restartedMain] INFO org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.prepareWebApplicationContext():296 {} - Root WebApplicationContext: initialization completed in 1723 ms
2024-02-17 09:40:10,731 [restartedMain] INFO org.springframework.boot.devtools.autoconfigure.OptionalLiveReloadServer.startServer():59 {} - LiveReload server is running on port 35729
2024-02-17 09:40:10,741 [restartedMain] INFO org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver.<init>():58 {} - Exposing 1 endpoint(s) beneath base path '/actuator'
2024-02-17 09:40:10,801 [restartedMain] INFO org.apache.juli.logging.DirectJDKLog.log():173 {} - Starting ProtocolHandler ["http-nio-8080"]
2024-02-17 09:40:10,831 [restartedMain] INFO org.springframework.boot.web.embedded.tomcat.TomcatWebServer.start():241 {} - Tomcat started on port 8080 (http) with context path ''
2024-02-17 09:40:10,865 [restartedMain] INFO org.springframework.boot.StartupInfoLogger.logStarted():56 {} - Started ProducerApplication in 3.159 seconds (process running for 4.502)

One thing to observe here is empty braces on each log line, right before the log message. If you will look at the log pattern we defined:

%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] %-5p %C.%M():%L %X - %m%n"

We have a %X pattern right before our message(%m) pattern. %X is to append the MDC info into our log message. Now, what is MDC?

Whenever our application receives an API request, it starts a new thread to handle that request. Once the response is returned, that threads is free now, to be garbage collected, or to be reused for the next request. MDC is the information attached to that particular thread context. MDC stands for Mapped Diagnostic Context. Why we need MDC?

For applications, that receives thousands of API requests per second, they need a way to know, which request a particular log is associated with. To handle this, we simply assign an ID to the request, and add it to MDC. You can simply get log lines with that particular ID, and get complete logs for the whole process. In web app, this is usually done using web filters. This deserves a blog of it’s own, so I won’t get into details of that here. Since this is a small application, and this is clearly out of scope for our topic of discussion, I will do a separate blog for that.

https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternLayout

APIs

We will create just two APIs, one will send a message to user, while the other will send email. We won’t be sending the actual notification here. Why? Because sending actual notification has dependency on third party vendors, and a time taking process, hence, the client will keep the connection open, to know, that notification has been sent, when our API returns response. This also hogs up the network resources, while doing nothing, but wait.

Solution: We will get a request from the user to send a notification. We will push that data to Kafka, along with other required meta data. Our application will then return a response, that the notification has been queued to be sent. Our consumer will pick this data from kafka, and send the actual message.

/sendEmail API

Create a class NotificationController and mark it with @RestController and @Log4j2 to make the class to be discoverable by spring and able to log into console and file. Create a method, and mark it with @PostMapping to expose a Post API.

Create a request class in dao module. This request will be passed as request body of this API.

package io.Geek8080.notification.dao;

import lombok.Data;

/**
* Created by Geek8080 on 17-02-2024.
*/

@Data
public class EmailNotificationRequest {

private String emailId;

private String message;

}

Now, go to controller class, and update the method, to receive an argument of type EmailNotificationRequest.

package io.Geek8080.notification.producer;

import io.Geek8080.notification.dao.EmailNotificationRequest;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
* Created by Geek8080 on 17-02-2024.
*/

@RestController
@Log4j2
public class NotificationController {

@PostMapping("sendEmail")
public ResponseEntity<?> sendEmail(@RequestBody EmailNotificationRequest emailNotofication) {
log.info("Received a request to send email at: {}", emailId);

return ResponseEntity.ok("SUCCESS");
}
}

You might be getting error in your IDE here. Your IDE cannot find the dao module. To help the ide and compiler to find the module, we need to declare, that we require that module. To do that, add the require statement into our module-info.java

module producer {
requires dao;
}

Your code doesn’t compile?

The way we created our modules, we placed the module-info.java in our base directory for each module. This is correct as per the JPMS documentation. But, maven fails to find your module-info.java file. Why?

As per documentation, the module-info.java file should be in the source directory of the module. By default, the source directory for a maven project is the src/main/java folder.

We have two solutions here, either change the source directory for maven project, or move the module-info.java to src/main/java. I am moving the file to source folder, as I find it easier option.

Once you have moved the module-info.java, you will see multiple issues, in both our modules. Let’s fix dao first.

In EmailNotificationRequest class, lombok can’t be resolved anymore. Go to module-info and add, requires lombok;

In Main class, it won’t be resolving the logging packages, add the following to add those modules to our project: requires org.apache.logging.log4j;

The file will look like this:

/**
* Created by Geek8080 on 28-01-2024.
*/
module dao {
requires static lombok;

requires org.slf4j;
requires static org.apache.logging.log4j;

exports io.Geek8080.notification.dao;
}

Next, go to producer module, and add the modules needed. We will need to add for lombok, logging and spring boot.

module producer {
requires dao;

// Spring dependencies
requires spring.boot;
requires spring.boot.autoconfigure;
requires spring.web;

requires static lombok;

// Logging essentials
requires org.slf4j;
requires org.apache.logging.log4j;
}

Requires static makes sure that the package is available at compile time, and not at runtime.

Run our application, and it will run fine now. Once it is running, to to a cURL client(Postman), and hit the API:

You will get this in logs:

2024-02-17 19:25:21,509 [http-nio-8080-exec-2] INFO  io.Geek8080.notification.producer.NotificationController.sendEmail():20 {} - Received a request to send email at: EmailNotificationRequest(emailId=usersame@gmail.com, message=hello Sam!)

This is great, we have our API running. In the upcoming parts, we will start our kafka broker, and see our messages being pushed.

Code at the end: https://github.com/Geek8080/notification-service/tree/3-2-notification-producer-API

--

--