The architecture diagram shows Dockers . We want to illustrate Power of Kafka Streams.
Messages are published to Kafka by micro-services. The Kafka Producer gets data from REST API provided by our friendly soul — sam(“svc-kaf-slcp-xx”)
We will use Kafka streams and push data to AWS Elasticsearch Cluster (instead of AWS MSK — customized ES Image)
Our indexed Messages Elasticsearch messages. We have these messages in Kafka Clusters. We will see hot to have this messages Using Async Messaging Services using SpringBoot.
Overview
We can have Thread based Kafka Consumer. But the pitfalls are there when we we are using Integration Flows and Other Clusters. Lets have Listener
using Annotation @KafkaListener.
Initial Ingestion
We want to have way to ingest set of hotel ids. We will use REST Client. It will call “Caf” micro-service that will provide us Hotels.
Input to Stream
We will use Kafka Producer services to publish the Hotels to Kafka.
We have ways to do
- Use Kafka Producer (without Spring Boot adding any abstraction layer on top of it
- Use Spring Boot and Kafka Producer as Injection
In our world we are famous for having multiple paths and overloading is used and misused. Do not blame Kafka if the Producer gets message and sends to Kafka and Kafka cluster does not gets. Remember we are talking to Spring
- Use Kafka Templates
Kafka Producers Acknowledgement
Spellcheck
Producer Acknowledgement is the feature of Message Broker. “Acks” is acknowledgments. Kafka Producer provided config that we can set to receive acknowledgment of data writes.
There are 3 enumerated values of acknowledgement
* 0 — We just send data to Broker say Broker-0 and we don’t need acknowledgment. If Broker is down we have data loss or others. When we have multiple brokers we will we can have some fun
* 1
We will send data to Broker-0 and have leader acknowledge us. Better ( we don’t wait for replication
* all (kind of 2)
Best we wait till replicas acknowledge. We are using Kafka for its feature distributed like ELK(ES cluster). We do not want loss ,, lets our message get replicated first and then we get acknowledgement else we can resend them or tend to error situation.
I used this as I do not have sweet SME on whom I can leave it to fix the issues. But be sure you can have junior or devs that might not use this. After all bug fixing is trendy .
Micro-services to send data to Kafka.
Lets use 8084 for this.
REST API to send data.
Postman is create. IT give environment and provides way to custom scripts to create JWT (for app specific tokens)
KafkaListener
We can have microservices(kafka-streams-hotel) and I have Elastic Controller . Our Elastic Controller is not needed As the Listener will ingest the data to ES Cluster. But yes we need sometimes to see. We have ES service that does the part
I have configured port 8082 to this. I have AWS — Elasticsearch on cloud. I am full fledged ES Cluster guy, having worked as ES Admin and then ES Architect I become … if I run this on my local machine(to have tiny ELK )
We will use AWS ES Cluster with multi Nodes. Now we have IngestService
KafkaStreama App
We have triggered the data and our KafkaStreams will enrich our multiple streams. It has sent stream to Topic.
We have sweet Kafka Streams that will get streams, perform enrichment and write the stream back
KafkaListener
SVHotelDetailAirportListener
Consumer REcord
HotelDetails
Sweet our POJO aka Domain Entity has come to use
Elasticsearch Cluster
And it has indexed the document the Kfaka Message
Our counts have increased and thanks to Kafka Streams that has helped us to work on Streaming . We have now Kafka Streams in ingestion pipeline
KafkaStreams
We can have microservices(kafka-streams-hotel) that when gets data enriched by Stream App ingests data to AWS — Elasticsearch Cluster. I have used AWS to have monitoring and other messaging AWS -SNS, SQS with Elasticcluster and like it so that we need not have to check is any issues. We will use console ES Dev console not AWS Console to see if data is indexed
It is indexed
We have our key that shows this
Kafka CLI
To peek data we can use this kafka-cosumer
Conclusion
We have used AWS Elasticsearch Cluster as Data Sink. Our kafka cluster is using Streams as part of ingest data pipeline to index documents. We added REST API so that we can invoke the pipeline with hotelIDs as adhoc ingestion. Please feel free to provided as much as hotels. I have used 2 to demo this feature and explain