Data Ingestion Pipelines Using Kafka Streams ,AWS Memcache, Elasticsearch

fisheye
7 min readApr 6, 2024

--

We’ll walk through the business usecase of consuming messages from a multiple Kafka Source and conditionally processing them to multiple destination topics . The primary focus will be on how to set this up Using Java Spring, Springboot, AWS Memcache application using the Kafka Streams library . Kafka Streams will be used to Sink Data to Elasticsearch. For SpringBoot — Elasticsearch please refer to my medium post which talks in depth.

Apache Kafka plays a pivotal role by providing a robust and scalable event-driven communication backbone. Microservices architecture allows us to build and deploy applications, enabling scalability, modularity, and flexibility.

In NOSQL environment Join is puzzle and NOSQL systems have shown that it scales well without using RDBMS Join — a legacy Microservice that is bottleneck when we want to scale as needed by Datasinks like MongoDB, Cassandra, Elasticsearch and others. I have faced this in many projects when we are building new applications using Distributed Messaging on Cloud AWS, GCP(Google Cloud) and others.

https://medium.com/@samb333/elasticsearch-ingestion-using-aws-amazon-mq-bb9f69d8506d

OVERVIEW

We will start Kafka Cluster

Kafka Cluster is scalable and fault tolerant distributed messaging system. It feature of elastic solution — able to handle hundreds of thousands of messages every second makes it stand on top. Deployment includes On Premise, Kubernetes and Cloud and bare metal environments. Kafka Streams utilizes exactly-once processing semantics, connects directly to Kafka, and does not require any separate processing cluster. Developers can leverage Kafka Streams using Linux, Mac, and Windows environments, and by writing standard Python, Java , Node , Scala applications. Kafka Streams is also fully integrated with Kafka Security, making it a secure and enterprise-trusted solution for handling sensitive data. Kafka Stream streaming capabilities are used by enterprises and Data Processing Pipelines.

Start Kafka Zookeeper aka Mgr, Kafka Broker, and have Kafka Streams. We will use Producer to send structured data . We are using hotel JSON to send it Kafka.

Kafka Configuration

For Kafka primer please visit my medium Post. We will go in depth with code and working solution that is illustrated here and covers in depth and advanced feature.

  • No duplicate records
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

. Acknowledge the message

properties.setProperty(ProducerConfig.ACKS_CONFIG, "all")

. Linger Config

properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");

. Retry the message

lets us assume that Producer will have some issues message flowing to brokers. We want to try more time — aka retry

 properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5")

KAFKA TOPIC

Create topic

KAFKA PRODUCER

We will consume messages from Data Source AWS Memcache Based Microservices.Please refer my medium post

https://medium.com/devops-dev/aws-microservices-using-aws-elsticcache-2375675399ac

Send Data — Hotel record

KAFKA CONSUMER

Sweet we have Kafka Consumer that gets the data now

STREAMS WITH GLOBAL TABLE

We have problem to solve . We have 2 or more Streams . Our Motel also has list of ariports that people can use to reach. We need to send to Elasticsearch the motel that has all multiple airports. We have multipe streams. We do not want to have monlithic Motels that has join to be done on AWS Compute memory using AWS Managed Service like Memcache. Let the control be given to Webapp and our Partners that they can decide what to use. They send us through Streams

Stream For Airports

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hub-airport-svin

Create Topic For Motel

Create Topic For Stream Processing Using Join

We need Global Table and will use multiline lambda function. Java gives ForEach to process Streams

We send our Motel through Streams

Peek of Topics

use — list option

bin/kafka-topics.sh --list

Streams Code — Producer

Streams Merge

We will use Global table to perform join

Start Consumer

Run the Stream and Get Results

We get success. We get the hiltonID2 and as expected the Motel we sent

We get Airport Detail

We get Master Detail

Consumer

We get the airportlist inside the motel

WE HAVE SUCCESS NOW

AWS EL Cache

Please refer my medium article for AWS Memcache, AWS ELCache. We have structured data

We will use SpringBoot RedisTemplate

Spring Boot Code

Spring Boot Microservices Run

We will run our staging service with profile run

In Eclipse use Profile DEV.

Main Type: Spring Boot App Class

REST API

Lets call the Rest Endpoint. We see the list of hotels.

Airports Microservice

We have id: “18@@en|1” . We need to URLEncode it . We can use https://www.urlencoder.net/. Sweet we now get the data

Create 1 — Save Airport for Hotel

Lets Get the Airport Details

Add another for Hotel

for id = 19

We can pass the ids or hkeys

Lets get the HotelDetails for 18

We have 18{ATRATE_SEP}EN

Stream App

Sweet WE HAVE APPLICATION.ID and client.id from Kafka

POJO

Stream App

Lets add the REST CLIENT

To fix the following error

Since the project was not created with Template. We need to add Manifest and plugin and tell that out of all other Main classes our Main Class is as shown

REST CLIENT

Add bean/add configuration for restTemplate(). The bean will be created and we can inject in Service Class

Service Class

DEBUG

Lets do for one of exiting id and lets hardcode it to avoid typo errors (for local dev only)

After fixing error. Issue is in special character @ that is being passed.

Must Run

  • MemCache ( RedisServer will do for loc..)
  • svc-caf-slcp-gatewaystg-service — These microservices are provide restendpoint for HotelDetails, Airlports
  • Run As Springboot App — kafka-streams-hotel . Add Manifest via maven-jar-plugin

(Comment the Main class and switch to <mainClass>com.svr.KafkaHotelClientSpringBootApp</mainClass>)

Conclusion

We have KafkaStreams . The data to Producer comes from SpringMicroservices. Most of time we get like this. And KafkaStreams has done sweet join and given stream needed for Elasticsearch(which has streams feature)

--

--