We will use implement Java Microservices . We will use advanced features of Kafka — Message Broker. We will send messages to Message Broker and view them. We will use services and server to swap. Java being serverside we have been using for longtime to build services. Unfortunately scaling them is not easy. We will implement and scale them
Kafka Server
Start Kafka server. We nee properties file and I have them in Config where configurations are defined
We have now broker running at 9092
Zookeeper Server
We will have running at 2181
Start ZooKeeper. Let me call this Manager who is gonna front end all the kafka servers let it be 1 or 2 or say 10.Clients control which partition to write , sweet the AMQP routing key is not needed.
We are interested in topics and offsets. We have
- built in topics or dynamically created by Kafka Platform /system.
- topics created by User
In between my kafka server or broker went down. Zookeeper will come tonow -heartbeat protocol
start broker again and see if this joins
Kafka UI
Create Kafka Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hub-hotel-svin
Start Kafka Producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hub-hotel-svin
Start Kafka Consumer
We will have no data in begining. Once the Stream App kafka-stream-hotel will start running , it will show the data as shown We want to count number of hotels in all states. Our Dataset is
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic svagg-hotel-svout --from-beginning --formatter kafka.tools.DefaultMesseFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Get Data Processed — Kafka Streams
We see we are sending textdata separated by comma and key is null
Kafka Streams
We want to count the numbers of candy colors sent to both Sally, alex
We are getting text stream we will separate them
.filter((key, value) -> value.contains(","))
trigger repartiton to create new stream. Our values are infact name and color we will create key with name [0] is key
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
lets change the dataset . 1:1 transformation doesnot creates new stream. mapValues. lambda fuction. we want all color candies be lowercase
.mapValues(value -> value.split(",")[1].toLowerCase())
filter thre dataset
filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour))
Writeback
- to stream (to writeback to Kafka Topic)
usersAndColours.to("user-keys-and-colours");
- or through
We can read now as KTable to perform aggregation
KTable<String, String> usersAndColoursTable = builder.table("user-keys-and-colours");
KTable has API to perform aggregation
we want dataset to narrow down say n to 1
KTable<String, Long> favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("CountsByColours")
.withKeySerde(stringSerde)
.withValueSerde(longSerde));
Write back to Topic
List topics
KStreams -Create Topics
KStreams- Start Consumer
KStream — Start Producer
Java Stream Code
Create directory
Create Project , Use the maven say 3.8 or 3.6
Maven Install
KStream Code
Run the App
We have thread that is running to process the stream. It creates KStream from xxx-svin topic transforms and pushes to topic state-keys-and-hotels
We read KTable from state-keys-and-hotels.Now aggregate write the result tp Topic(output) -svagg-hotel-svout
Connect Server
I love Kafka Connect. Confluent connectors kind of registry — commercial service that doesnot conflicts and volunteer to track registry of connectors , validation and their schema . Kafka is vert fast and we cannot add overheads by confusing or using names like business use cases and make it slow. Use it outside the realm of Kafka in your code . Yu can use any version . I have been using for long sweet 2 and we can have sweet 12 1 1 and I love another 1 .
I have lot of connectors
- SourceConnectors
- SinkConnectors
- Connectors from Confluent — company. For Confluent -please visit my post on Confluent and Kafka Connect. We can use term Confluent Connect etc as long as we remember Confluent is brand and has nothing to do with Apache Kafka. This was major source of confusion for me years back as commercial connectors Confluent is buzzed . My thanks to Apache Kafka team and contributors
Custom Source and Custom Sink as well as built from Confluent. We should remember “Serde” serializer, Deserializers can be Text, Json. I have done lot of work with AVRO and was happy to see that AVRO is there in Confluent for JSON schema and validation. We need it . Once the system is there in production folks can keep sending the messages that Producers and Consumers were not designed for and some folks as they pretend we donot know what is underneath.
Start Apache Kafka Connect.It comes up at 8083
curl -X GET http://172.17.85.196:8083/connectors
AWS Kafka Cluster
AWS — Create Kafka Cluster
AWS Kafka Cluster
We can use cli to access the cluster . (I created using AWS cli )
While back I created this on AWS.IF yu are using AWS use cli
AWS -Create topic
AWS -Bootstrap brokers
AWS Kafka Brokers
We always create scalable and will have multiple brokers. Get your brokers using CLI
Streams
I was attracted to Elasticsearch cluster years back because of the term REST services built in . We can search using queries like DSL or DB Style or ES native searches — Lucene. I can get and search and aggregate and all on non structured data — Google like search. I got attracted long time back to Streams I did lot of work using PyStreams.
We have topping — Streams . Kafka is sweet It doesnot want to obfuscate or confuse use by using generic terms like Stream, Deep learning, machine learning. It uses term KStream so that we do not want to get confused with Java Stream. World has known Java for abstraction and mapping. We can stare with properties and can create mappings . With layer or microservices every team gets to do abstraction and mapping , abstract configurations.
Operations are performed on datasets. We will use loose words like tuple, KV, properties to help that it is similar- datastructure
KStream
Operations like map, flatMapValues change the key and trigger repartitioning (very expensive like reindexing). Any dataset is inserted. Insert Operation . Sally loves candyballs. Lets define KV(K-String(Sally) V-String(red). Let the guests send the KV data to Kafka. We will have
tuples or KV or Properties . The stream is immutable . Beauty is the key can be null and it will flow to stream
- (sally, red)
- (sally, blue)
- (salle,green)
- (alex, red)
- (Null, red)
KTable
It performs upsert. If the key not there it will insert, If key like sally there it will update. So if Sally is tired and can have 1 flower that will be update
- (sally, rose)
let send tulip to sally. IT will be update rose to tulip. If you send null key . It will delete the record or tuple or dataset
. (sally, tulip)
KGlobal Table
If want to be lazy and have sytem create something for us then KGlobaltable is the that can be used to perform joins
Topology
To perform operations on streams like Step1, Step2, Step3 we have topology. Some operations can repartition or create new streams or branch the streams or change it to KTable. The marble diagram is great and we can use this to draw pictures and happilly share this as Topology, kind of UML Sequence Diagram but for Streams
Our Application say Slcp has appId , GroupId. we can have many consumers join the group and they need GroupId.
Conclusion
We created Stream Application to process incoming hotels. We used KStream, KTable to perform streams operations, transformations and wrote back to Kafka Topic