Microservice- Using Kafka to send data to Elastic Cluster

fisheye
7 min readApr 6, 2024

--

We will use implement Java Microservices . We will use advanced features of Kafka — Message Broker. We will send messages to Message Broker and view them. We will use services and server to swap. Java being serverside we have been using for longtime to build services. Unfortunately scaling them is not easy. We will implement and scale them

Kafka Server

Start Kafka server. We nee properties file and I have them in Config where configurations are defined

We have now broker running at 9092

Zookeeper Server

We will have running at 2181

Start ZooKeeper. Let me call this Manager who is gonna front end all the kafka servers let it be 1 or 2 or say 10.Clients control which partition to write , sweet the AMQP routing key is not needed.

We are interested in topics and offsets. We have

  • built in topics or dynamically created by Kafka Platform /system.
  • topics created by User

In between my kafka server or broker went down. Zookeeper will come tonow -heartbeat protocol

start broker again and see if this joins

Kafka UI

Create Kafka Topic

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  hub-hotel-svin

Start Kafka Producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hub-hotel-svin

Start Kafka Consumer

We will have no data in begining. Once the Stream App kafka-stream-hotel will start running , it will show the data as shown We want to count number of hotels in all states. Our Dataset is

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092     --topic svagg-hotel-svout     --from-beginning     --formatter kafka.tools.DefaultMesseFormatter     --property print.key=true     --property print.value=true     --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer     --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Get Data Processed — Kafka Streams

We see we are sending textdata separated by comma and key is null

Kafka Streams

We want to count the numbers of candy colors sent to both Sally, alex

We are getting text stream we will separate them

.filter((key, value) -> value.contains(","))

trigger repartiton to create new stream. Our values are infact name and color we will create key with name [0] is key

.selectKey((key, value) -> value.split(",")[0].toLowerCase())

lets change the dataset . 1:1 transformation doesnot creates new stream. mapValues. lambda fuction. we want all color candies be lowercase

.mapValues(value -> value.split(",")[1].toLowerCase())

filter thre dataset

filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour))

Writeback

  • to stream (to writeback to Kafka Topic)
 usersAndColours.to("user-keys-and-colours");
  • or through

We can read now as KTable to perform aggregation

 KTable<String, String> usersAndColoursTable = builder.table("user-keys-and-colours");

KTable has API to perform aggregation

we want dataset to narrow down say n to 1

 KTable<String, Long> favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("CountsByColours")
.withKeySerde(stringSerde)
.withValueSerde(longSerde));

Write back to Topic

List topics

KStreams -Create Topics

KStreams- Start Consumer

KStream — Start Producer

Java Stream Code

Create directory

Create Project , Use the maven say 3.8 or 3.6

Maven Install

KStream Code

Run the App

We have thread that is running to process the stream. It creates KStream from xxx-svin topic transforms and pushes to topic state-keys-and-hotels

We read KTable from state-keys-and-hotels.Now aggregate write the result tp Topic(output) -svagg-hotel-svout

Connect Server

I love Kafka Connect. Confluent connectors kind of registry — commercial service that doesnot conflicts and volunteer to track registry of connectors , validation and their schema . Kafka is vert fast and we cannot add overheads by confusing or using names like business use cases and make it slow. Use it outside the realm of Kafka in your code . Yu can use any version . I have been using for long sweet 2 and we can have sweet 12 1 1 and I love another 1 .

I have lot of connectors

  • SourceConnectors
  • SinkConnectors
  • Connectors from Confluent — company. For Confluent -please visit my post on Confluent and Kafka Connect. We can use term Confluent Connect etc as long as we remember Confluent is brand and has nothing to do with Apache Kafka. This was major source of confusion for me years back as commercial connectors Confluent is buzzed . My thanks to Apache Kafka team and contributors

Custom Source and Custom Sink as well as built from Confluent. We should remember “Serde” serializer, Deserializers can be Text, Json. I have done lot of work with AVRO and was happy to see that AVRO is there in Confluent for JSON schema and validation. We need it . Once the system is there in production folks can keep sending the messages that Producers and Consumers were not designed for and some folks as they pretend we donot know what is underneath.

Start Apache Kafka Connect.It comes up at 8083

curl -X GET http://172.17.85.196:8083/connectors

AWS Kafka Cluster

AWS — Create Kafka Cluster

AWS Kafka Cluster

We can use cli to access the cluster . (I created using AWS cli )

While back I created this on AWS.IF yu are using AWS use cli

AWS -Create topic

AWS -Bootstrap brokers

AWS Kafka Brokers

We always create scalable and will have multiple brokers. Get your brokers using CLI

Streams

I was attracted to Elasticsearch cluster years back because of the term REST services built in . We can search using queries like DSL or DB Style or ES native searches — Lucene. I can get and search and aggregate and all on non structured data — Google like search. I got attracted long time back to Streams I did lot of work using PyStreams.

We have topping — Streams . Kafka is sweet It doesnot want to obfuscate or confuse use by using generic terms like Stream, Deep learning, machine learning. It uses term KStream so that we do not want to get confused with Java Stream. World has known Java for abstraction and mapping. We can stare with properties and can create mappings . With layer or microservices every team gets to do abstraction and mapping , abstract configurations.

Operations are performed on datasets. We will use loose words like tuple, KV, properties to help that it is similar- datastructure

KStream

Operations like map, flatMapValues change the key and trigger repartitioning (very expensive like reindexing). Any dataset is inserted. Insert Operation . Sally loves candyballs. Lets define KV(K-String(Sally) V-String(red). Let the guests send the KV data to Kafka. We will have

tuples or KV or Properties . The stream is immutable . Beauty is the key can be null and it will flow to stream

  1. (sally, red)
  2. (sally, blue)
  3. (salle,green)
  4. (alex, red)
  5. (Null, red)

KTable

It performs upsert. If the key not there it will insert, If key like sally there it will update. So if Sally is tired and can have 1 flower that will be update

  • (sally, rose)

let send tulip to sally. IT will be update rose to tulip. If you send null key . It will delete the record or tuple or dataset

. (sally, tulip)

KGlobal Table

If want to be lazy and have sytem create something for us then KGlobaltable is the that can be used to perform joins

Topology

To perform operations on streams like Step1, Step2, Step3 we have topology. Some operations can repartition or create new streams or branch the streams or change it to KTable. The marble diagram is great and we can use this to draw pictures and happilly share this as Topology, kind of UML Sequence Diagram but for Streams

Our Application say Slcp has appId , GroupId. we can have many consumers join the group and they need GroupId.

Conclusion

We created Stream Application to process incoming hotels. We used KStream, KTable to perform streams operations, transformations and wrote back to Kafka Topic

--

--