Spring Boot Kafka Producer and Consumer example

Jeyaganesh Selvaraj
3 min readOct 31, 2022

--

This is a quick example application of Kafka producer and consumer in Spring Boot.

Tools

  1. Spring Boot — WebFlux
  2. Apache Avro
  3. Test containers
  4. Docker
  5. Docker Compose(Optional)

Application Overview

This example application is designed to receive a sales event as a REST request and send it to the Kafka topic. A Kafka stream processor will receive the events from the topic for processing.

Spring WebFlux is used to expose REST service. Apache Avro is used to serialize sales events. Avro schema is stored in Kafka Schema Registry to be used by both producer and consumer.

When a sales event is sent to the REST API as below, it will be received by the router function and then the router function uses Kafka producer to send the event to the Kafka topic.

curl -v  -H 'Content-Type: application/json' -u 'admin:admin' localhost:5454/sale -d '{"itemId":1,"quantity": 0, "saleDate":"20221028T220909.999","unitPrice":23.99,"sellerID":"S123"}'
* Trying 127.0.0.1:5454...
* Connected to localhost (127.0.0.1) port 5454 (#0)
* Server auth using Basic with user 'admin'
> POST /sale HTTP/1.1
> Host: localhost:5454
> Authorization: Basic YWRtaW46YWRtaW4=
> User-Agent: curl/7.85.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 96
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Content-Type: application/json
< content-length: 0

Logs

kafkaexample-app_1  | 2022-10-31 00:48:32.587  INFO 9 --- [afka-producer-1] c.e.kafkaexample.connector.EventSender   : Event sent successfully to kafka, topic-partition=sale_events-9 offset=0 timestamp=1667177311859
kafkaexample-app_1 | 2022-10-31 00:48:32.592 INFO 9 --- [afka-producer-1] c.e.k.filters.ResponseLogHandler : Req ID: 0eba3daf-8a72-436e-bc4a-a8c696dd4502, Response status 202 ACCEPTED
schemaregistry_1 | [2022-10-31 00:48:32,622] INFO 172.22.0.5 - - [31/Oct/2022:00:48:32 +0000] "GET /schemas/ids/1?fetchMaxId=false&subject=sale_events-value HTTP/1.1" 200 497 31 (io.confluent.rest-utils.requests)
kafkaexample-app_1 | 2022-10-31 00:48:32.673 INFO 9 --- [-StreamThread-1] c.e.k.connector.EventConsumer : 1. Received record for item 1, class com.example.kafkaexample.model.SaleEvent
kafkaexample-app_1 | 2022-10-31 00:48:32.677 INFO 9 --- [-StreamThread-1] c.e.k.handlers.SaleEventHandler : Handling a sale event in consumer {"itemId": 1, "quantity": 0, "saleDate": "20221028T220909.999", "sellerID": "S123", "unitPrice": 23.99}

WebFlux Routing Configuration

Spring WebFlux Router configuration uses a simple request and response logger. It also uses a simple security filter. Request & Response handlers and security filters are just used for testing different WebFlux components.

RouterConfig.java

Kafka Producer Configuration

Kafka Producer uses KafkaAvroSerializer for value serialization. A Kafka Schema registry is used for storing Avro schema.

KafkaProducerConfig.java

This example application uses the below Avro schema

Kafka Streaming Configuration

The Kafka streaming library is used for consuming events from Kafka topics. Spring boot’s default KafkaStreamingConfiguration is overridden with application-specific Serdes. SaleEvent POJO is generated from the schema using the avro maven plugin. The generated POJO will have schema as a static field which is used to register in the Kafka schema registry client.

KafkaStreamConfig.java

Event Consumer

Spring Kafka’s StreamBuilder bean is used to build a stream pipeline to consume the events. The schema registry client from KafkaStreamConfig class is used with SpecificAvroSerde to consume avro data.

Testing

Test containers are used for integration testing. Kafka, Zookeeper, and Schema Registry images from confluent are used to create containers for testing.

Conclusion

Development and integration testing is easier using Test containers than testing with mocks.

The code for this example application can be found in GitHub Repository.

--

--

Jeyaganesh Selvaraj

Professional programmer. Interested to explore new programming languages.