Data Ingestion Pipelines Using Kafka Streams ,AWS Memcache, Elasticsearch
We’ll walk through the business usecase of consuming messages from a multiple Kafka Source and conditionally processing them to multiple destination topics . The primary focus will be on how to set this up Using Java Spring, Springboot, AWS Memcache application using the Kafka Streams library . Kafka Streams will be used to Sink Data to Elasticsearch. For SpringBoot — Elasticsearch please refer to my medium post which talks in depth.
Apache Kafka plays a pivotal role by providing a robust and scalable event-driven communication backbone. Microservices architecture allows us to build and deploy applications, enabling scalability, modularity, and flexibility.
In NOSQL environment Join is puzzle and NOSQL systems have shown that it scales well without using RDBMS Join — a legacy Microservice that is bottleneck when we want to scale as needed by Datasinks like MongoDB, Cassandra, Elasticsearch and others. I have faced this in many projects when we are building new applications using Distributed Messaging on Cloud AWS, GCP(Google Cloud) and others.
https://medium.com/@samb333/elasticsearch-ingestion-using-aws-amazon-mq-bb9f69d8506d
OVERVIEW
We will start Kafka Cluster
Kafka Cluster is scalable and fault tolerant distributed messaging system. It feature of elastic solution — able to handle hundreds of thousands of messages every second makes it stand on top. Deployment includes On Premise, Kubernetes and Cloud and bare metal environments. Kafka Streams utilizes exactly-once processing semantics, connects directly to Kafka, and does not require any separate processing cluster. Developers can leverage Kafka Streams using Linux, Mac, and Windows environments, and by writing standard Python, Java , Node , Scala applications. Kafka Streams is also fully integrated with Kafka Security, making it a secure and enterprise-trusted solution for handling sensitive data. Kafka Stream streaming capabilities are used by enterprises and Data Processing Pipelines.
Start Kafka Zookeeper aka Mgr, Kafka Broker, and have Kafka Streams. We will use Producer to send structured data . We are using hotel JSON to send it Kafka.
Kafka Configuration
For Kafka primer please visit my medium Post. We will go in depth with code and working solution that is illustrated here and covers in depth and advanced feature.
- No duplicate records
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
. Acknowledge the message
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all")
. Linger Config
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
. Retry the message
lets us assume that Producer will have some issues message flowing to brokers. We want to try more time — aka retry
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
KAFKA TOPIC
Create topic
KAFKA PRODUCER
We will consume messages from Data Source AWS Memcache Based Microservices.Please refer my medium post
https://medium.com/devops-dev/aws-microservices-using-aws-elsticcache-2375675399ac
Send Data — Hotel record
KAFKA CONSUMER
Sweet we have Kafka Consumer that gets the data now
STREAMS WITH GLOBAL TABLE
We have problem to solve . We have 2 or more Streams . Our Motel also has list of ariports that people can use to reach. We need to send to Elasticsearch the motel that has all multiple airports. We have multipe streams. We do not want to have monlithic Motels that has join to be done on AWS Compute memory using AWS Managed Service like Memcache. Let the control be given to Webapp and our Partners that they can decide what to use. They send us through Streams
Stream For Airports
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hub-airport-svin
Create Topic For Motel
Create Topic For Stream Processing Using Join
We need Global Table and will use multiline lambda function. Java gives ForEach to process Streams
We send our Motel through Streams
Peek of Topics
use — list option
bin/kafka-topics.sh --list
Streams Code — Producer
Streams Merge
We will use Global table to perform join
Start Consumer
Run the Stream and Get Results
We get success. We get the hiltonID2 and as expected the Motel we sent
We get Airport Detail
We get Master Detail
Consumer
We get the airportlist inside the motel
WE HAVE SUCCESS NOW
AWS EL Cache
Please refer my medium article for AWS Memcache, AWS ELCache. We have structured data
We will use SpringBoot RedisTemplate
Spring Boot Code
Spring Boot Microservices Run
We will run our staging service with profile run
In Eclipse use Profile DEV.
Main Type: Spring Boot App Class
REST API
Lets call the Rest Endpoint. We see the list of hotels.
Airports Microservice
We have id: “18@@en|1” . We need to URLEncode it . We can use https://www.urlencoder.net/. Sweet we now get the data
Create 1 — Save Airport for Hotel
Lets Get the Airport Details
Add another for Hotel
for id = 19
We can pass the ids or hkeys
Lets get the HotelDetails for 18
We have 18{ATRATE_SEP}EN
Stream App
Sweet WE HAVE APPLICATION.ID and client.id from Kafka
POJO
Stream App
Lets add the REST CLIENT
To fix the following error
Since the project was not created with Template. We need to add Manifest and plugin and tell that out of all other Main classes our Main Class is as shown
REST CLIENT
Add bean/add configuration for restTemplate(). The bean will be created and we can inject in Service Class
Service Class
DEBUG
After fixing error. Issue is in special character @ that is being passed.
Must Run
- MemCache ( RedisServer will do for loc..)
- svc-caf-slcp-gatewaystg-service — These microservices are provide restendpoint for HotelDetails, Airlports
- Run As Springboot App — kafka-streams-hotel . Add Manifest via maven-jar-plugin
(Comment the Main class and switch to <mainClass>com.svr.KafkaHotelClientSpringBootApp</mainClass>)
Conclusion
We have KafkaStreams . The data to Producer comes from SpringMicroservices. Most of time we get like this. And KafkaStreams has done sweet join and given stream needed for Elasticsearch(which has streams feature)