Spring Boot Kafka Producer and Consumer example
This is a quick example application of Kafka producer and consumer in Spring Boot.
Tools
Application Overview
This example application is designed to receive a sales event as a REST request and send it to the Kafka topic. A Kafka stream processor will receive the events from the topic for processing.
Spring WebFlux is used to expose REST service. Apache Avro is used to serialize sales events. Avro schema is stored in Kafka Schema Registry to be used by both producer and consumer.
When a sales event is sent to the REST API as below, it will be received by the router function and then the router function uses Kafka producer to send the event to the Kafka topic.
curl -v -H 'Content-Type: application/json' -u 'admin:admin' localhost:5454/sale -d '{"itemId":1,"quantity": 0, "saleDate":"20221028T220909.999","unitPrice":23.99,"sellerID":"S123"}'
* Trying 127.0.0.1:5454...
* Connected to localhost (127.0.0.1) port 5454 (#0)
* Server auth using Basic with user 'admin'
> POST /sale HTTP/1.1
> Host: localhost:5454
> Authorization: Basic YWRtaW46YWRtaW4=
> User-Agent: curl/7.85.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 96
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Content-Type: application/json
< content-length: 0
Logs
kafkaexample-app_1 | 2022-10-31 00:48:32.587 INFO 9 --- [afka-producer-1] c.e.kafkaexample.connector.EventSender : Event sent successfully to kafka, topic-partition=sale_events-9 offset=0 timestamp=1667177311859
kafkaexample-app_1 | 2022-10-31 00:48:32.592 INFO 9 --- [afka-producer-1] c.e.k.filters.ResponseLogHandler : Req ID: 0eba3daf-8a72-436e-bc4a-a8c696dd4502, Response status 202 ACCEPTED
schemaregistry_1 | [2022-10-31 00:48:32,622] INFO 172.22.0.5 - - [31/Oct/2022:00:48:32 +0000] "GET /schemas/ids/1?fetchMaxId=false&subject=sale_events-value HTTP/1.1" 200 497 31 (io.confluent.rest-utils.requests)
kafkaexample-app_1 | 2022-10-31 00:48:32.673 INFO 9 --- [-StreamThread-1] c.e.k.connector.EventConsumer : 1. Received record for item 1, class com.example.kafkaexample.model.SaleEvent
kafkaexample-app_1 | 2022-10-31 00:48:32.677 INFO 9 --- [-StreamThread-1] c.e.k.handlers.SaleEventHandler : Handling a sale event in consumer {"itemId": 1, "quantity": 0, "saleDate": "20221028T220909.999", "sellerID": "S123", "unitPrice": 23.99}
WebFlux Routing Configuration
Spring WebFlux Router configuration uses a simple request and response logger. It also uses a simple security filter. Request & Response handlers and security filters are just used for testing different WebFlux components.
Kafka Producer Configuration
Kafka Producer uses KafkaAvroSerializer for value serialization. A Kafka Schema registry is used for storing Avro schema.
This example application uses the below Avro schema
Kafka Streaming Configuration
The Kafka streaming library is used for consuming events from Kafka topics. Spring boot’s default KafkaStreamingConfiguration is overridden with application-specific Serdes. SaleEvent POJO is generated from the schema using the avro maven plugin. The generated POJO will have schema as a static field which is used to register in the Kafka schema registry client.
Event Consumer
Spring Kafka’s StreamBuilder bean is used to build a stream pipeline to consume the events. The schema registry client from KafkaStreamConfig class is used with SpecificAvroSerde to consume avro data.
Testing
Test containers are used for integration testing. Kafka, Zookeeper, and Schema Registry images from confluent are used to create containers for testing.
Conclusion
Development and integration testing is easier using Test containers than testing with mocks.
The code for this example application can be found in GitHub Repository.