SpringBoot Kafka Consumers and Elasticsearch Clusters

fisheye
5 min readApr 16, 2024

--

The architecture diagram shows Dockers . We want to illustrate Power of Kafka Streams.
Messages are published to Kafka by micro-services. The Kafka Producer gets data from REST API provided by our friendly soul — sam(“svc-kaf-slcp-xx”)
We will use Kafka streams and push data to AWS Elasticsearch Cluster (instead of AWS MSK — customized ES Image)
Our indexed Messages Elasticsearch messages. We have these messages in Kafka Clusters. We will see hot to have this messages Using Async Messaging Services using SpringBoot.

Overview

We can have Thread based Kafka Consumer. But the pitfalls are there when we we are using Integration Flows and Other Clusters. Lets have Listener
using Annotation @KafkaListener.

Initial Ingestion

We want to have way to ingest set of hotel ids. We will use REST Client. It will call “Caf” micro-service that will provide us Hotels.

Input to Stream

We will use Kafka Producer services to publish the Hotels to Kafka.

We have ways to do

  • Use Kafka Producer (without Spring Boot adding any abstraction layer on top of it
  • Use Spring Boot and Kafka Producer as Injection

In our world we are famous for having multiple paths and overloading is used and misused. Do not blame Kafka if the Producer gets message and sends to Kafka and Kafka cluster does not gets. Remember we are talking to Spring

  • Use Kafka Templates

Kafka Producers Acknowledgement

Spellcheck
Producer Acknowledgement is the feature of Message Broker. “Acks” is acknowledgments. Kafka Producer provided config that we can set to receive acknowledgment of data writes.
There are 3 enumerated values of acknowledgement
* 0 — We just send data to Broker say Broker-0 and we don’t need acknowledgment. If Broker is down we have data loss or others. When we have multiple brokers we will we can have some fun
* 1
We will send data to Broker-0 and have leader acknowledge us. Better ( we don’t wait for replication
* all (kind of 2)
Best we wait till replicas acknowledge. We are using Kafka for its feature distributed like ELK(ES cluster). We do not want loss ,, lets our message get replicated first and then we get acknowledgement else we can resend them or tend to error situation.
I used this as I do not have sweet SME on whom I can leave it to fix the issues. But be sure you can have junior or devs that might not use this. After all bug fixing is trendy .

Micro-services to send data to Kafka.

Lets use 8084 for this.

REST API to send data.

Postman is create. IT give environment and provides way to custom scripts to create JWT (for app specific tokens)

KafkaListener

We can have microservices(kafka-streams-hotel) and I have Elastic Controller . Our Elastic Controller is not needed As the Listener will ingest the data to ES Cluster. But yes we need sometimes to see. We have ES service that does the part

I have configured port 8082 to this. I have AWS — Elasticsearch on cloud. I am full fledged ES Cluster guy, having worked as ES Admin and then ES Architect I become … if I run this on my local machine(to have tiny ELK )

We will use AWS ES Cluster with multi Nodes. Now we have IngestService

KafkaStreama App

We have triggered the data and our KafkaStreams will enrich our multiple streams. It has sent stream to Topic.

We have sweet Kafka Streams that will get streams, perform enrichment and write the stream back

KafkaListener

SVHotelDetailAirportListener

Consumer REcord

HotelDetails

Sweet our POJO aka Domain Entity has come to use

Elasticsearch Cluster

And it has indexed the document the Kfaka Message

Our counts have increased and thanks to Kafka Streams that has helped us to work on Streaming . We have now Kafka Streams in ingestion pipeline

KafkaStreams

We can have microservices(kafka-streams-hotel) that when gets data enriched by Stream App ingests data to AWS — Elasticsearch Cluster. I have used AWS to have monitoring and other messaging AWS -SNS, SQS with Elasticcluster and like it so that we need not have to check is any issues. We will use console ES Dev console not AWS Console to see if data is indexed

It is indexed

We have our key that shows this

Kafka CLI

To peek data we can use this kafka-cosumer

Conclusion

We have used AWS Elasticsearch Cluster as Data Sink. Our kafka cluster is using Streams as part of ingest data pipeline to index documents. We added REST API so that we can invoke the pipeline with hotelIDs as adhoc ingestion. Please feel free to provided as much as hotels. I have used 2 to demo this feature and explain

--

--