Integrating Spring webflux(Reactive) with Apache Kafka using Spring Cloud Stream 3

Yash Patidar
5 min readOct 12, 2022

--

Apache Kafka is a platform which is used to stream data from one system to another. It is an event streaming platform. It is usually used as message broker, log aggregator, real-time data analytics.

Spring WebFlux is parallel version of Spring MVC and supports fully non-blocking reactive streams. It support the back pressure concept and uses Netty as inbuilt server to run reactive application. If you are familiar with Spring MVC programming style, you can easily work on webflux also.

Reactive Programming is a programming language with asynchronous data stream. Once an event will raise it will react with responsive and non-blocking manner that’s why it named it as reactive programming.

Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.

In this article we will learn how we can integrate apache kafka with spring webflux with Spring Cloud Stream 3.

Nowadays microservices are becoming very popular in software industry and thus the use of apache kafka become a useful tool to make asynchronous communication between them.

The below diagram demonstrate the working of kafka.

Representation of working of Apache Kafka

A stream of data flows from producer to consumer via apache kafka’s topic. Data is put over any topic and then it can be consumed by a consumer from that topic at any instant, this makes this process asynchronous both the systems i.e. producer and consumer does not depend on each other for communication between them.

Kafka Producer

We need to add spring kafka dependency into the pom file.

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Now as Spring Cloud Stream can be setup from application.yml file we will add a configuration for our kafka into that.

application.yml

Important terms

  1. spring.cloud.stream.function.definition : We mention the name of function from which the data is produced to the topic.
  2. spring.cloud.stream.bindings: Here we define different consumers and producers <functionName>-out-0 here out is used as the function is emitting the data and than 0 refers to the 0th argument is the data which should get emitted.
  3. destination - It is the name of kafka topic.(we do not need to create topic manually, just write it here and it will be automatically created.)
  4. contentType - It describe the type of content to be put on this topic.

Now, we will create a producer stringSupplier as mentioned in our application.yml file

@Service
public class ProduceEvents {

@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(() -> {
try {
Thread.sleep(1000);
return "Hello from Supplier".concat(Instant.now().toString());
} catch (Exception e) {
// ignore
return "Exception Occurred";
}
})).subscribeOn(Schedulers.boundedElastic()).share();
}
}

Here we created a class annoted with @Service which tells spring that this class is a component. In this we define a method annoted with @Bean. Here for example a thread is created which will emit a flux of string every second. This can be treated as a producer which will produce a flux of string and this flux is transmitted over a kafka topic.

The producer MS is all set up and ready to produce string and transmitting it over kafka topic.

Kafka Consumer

We need to add a following dependency to the project in order to use kafka.

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Now as Spring Cloud Stream can be setup from application.yml file we will add a configuration for our kafka consumer into that.

spring:
cloud:
stream:
default:
producer:
useNativeEncoding: false
consumer:
useNativeDecoding: false
function:
definition: consumeString
kafka:
binder:
brokers: ${KAFKA_URL:http://localhost:9092}
producer-properties:
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
bindings:
consumeString-in-0:
destination: testTopic
contentType: application/json

Here, similar to what we have done in application.yml for Producer MS we define the consumer function and bindings.

As here the Flux is consumed that is why we write -in- in binding function name. the destination(kafka topic) and contentType is same as defined in producer MS.

Now we will see the code for Consuming Flux of String from kafka topic.

@Service
public class ConsumeEvents {

private final Logger log = LoggerFactory.getLogger(ConsumeEvents.class);

@Bean
public Consumer<Flux<String>> consumeString(){
return strFlux -> {
//business Logic
strFlux.doOnNext(str -> {
log.info(str);
}).subscribe();
};
}
}

Here we are just printing the string that we are getting in the flux.

The important thing here to note is while consuming flux from kafka topic we do not return anything on topic thus for this to be non blocking; we must subscribe to the flux, Hence we can either use subscribe() or then() on the incoming flux.

Here is the output on console.

2022–05–07 21:39:08.685 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:08.679655900Z
2022–05–07 21:39:09.696 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:09.691208700Z
2022–05–07 21:39:10.709 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:10.705129200Z
2022–05–07 21:39:11.709 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:11.705934300Z
2022–05–07 21:39:12.711 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:12.707167400Z
2022–05–07 21:39:13.715 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:13.712049300Z
2022–05–07 21:39:14.727 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:14.724458700Z
2022–05–07 21:39:15.740 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:15.735790200Z
2022–05–07 21:39:16.752 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:16.749626200Z
2022–05–07 21:39:17.761 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:17.753175200Z
2022–05–07 21:39:18.764 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:18.764140400Z
2022–05–07 21:39:19.770 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:19.766160100Z
2022–05–07 21:39:20.775 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:20.767748200Z
2022–05–07 21:39:21.785 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:21.781734Z
2022–05–07 21:39:22.794 INFO 3152 — — [container-0-C-1] c.e.c.service.kafka.ConsumeEvents : Hello from Supplier 2022–05–07T16:09:22.794100600Z

Here we can see that string Hello from Supplier [TIMESTAMP] is printed over the console, hence string is consumed from the kafka topic.

--

--