Build Streaming Data Pipelines Using KafkaStreams To Index Data To AWS Elasticsearch Cluster

fisheye
3 min readApr 10, 2024

--

We will use Streams . We have hotels that are published to Stream . We have airports as stream. We will Use joins to produce Stream

Any architectural style has trade-offs: strengths and weaknesses that we must evaluate according to the context that it’s used. This is certainly the case with microservices. Indeed most, situations would do better with a monolith.

During the development we will also see the solution when we see error ‘java.util.LinkedHashMap cannot be cast to class’ .

For Microservices setup please visit my Microservices Medium post

https://medium.com/@sambsv/implementing-microservices-using-spring-boot-48be589ec4ec

Publish Message To Kafka Topic “hub-motel-svin”

Kafka is listening on TOPIC_HUB_MOTEL_SVIN = “hub-motel-svin”. We will publish hotel details to it. The Microservices from “hotels” do not provide any information for nearby airports that customers can use

Elasticsearch cluster needs the nearby airports before the hotel details can be ingested and indexed. Our customers and web-app do not care this back-end they see the Elasticsearch that serves the hotel details.

Consume REST API from Hotel Microservices

Code

Publish Airport Messages To Kafka Topic — “hub-airport-svin”

We have list of Airports coming as Kafka stream. The Data source is publishing to Kafka Topic. We have microservices running on 8082

public static final String TOPIC_HUB_AIRPORT_SVIN = "hub-airport-svin";

Kafka Stream

We have complex problem of adding Airport List to the Hotel. The hotel can have 1 or more Airport list. As we have streams coming we are preplexed how to add the Airport documents to Hotel Document. I use the word Dcoument as they are having the structure that Elasticsearch can ingest.

Kafka Streams is use to performs database like join to make it more attractive to our customers. Start the Kafka Stream

Output Document -Success

We can peak into what is being published by above app . Sweet we see the hotels have the airport list attached We need this Stream to index documents to Elasticsearch Cluster

For Ingestion Data Pipeline for Elasticsearch cluster please visit my medium post

Conclusion

In this post we have used Kafka Streams to Ingest data streams to Elasticsearch cluster.

--

--