Getting started with Kafka Streams

Sameer Bandaru
7 min readMar 3, 2020

--

With growing volumes of data (statistics says data gets doubled every 2–3 years),its acquisition and its subsequent transformation into actionable insight is quite complex process. Especially when dealing with real time streaming data at an unprecedented speed is a big challenge.

But with the evolution of many architectures available now, it has become fairly easy to accelerate your data processing at a large scale.

Of many architectures that are available for processing real time streaming data, Kafka Streams (from Apache Kafka) is one of them which can be used to build a highly Elastic, Scalable and Fault-tolerant system.

It is a processing and transformation component that comes/ships along with kafka binary. It can be used for multiple purposes:-

1. To build a streaming pipeline across systems/different application components (works well with Distributed systems).

2. To enrich the incoming stream of data (performing some data transformation).

The good thing with kafka streams is that no separate cluster is required to run it.

If we see at a very high level , there are primarily three components involved.

Input Source, Output Source:-

Input source is an ingestion component for our streams application. This will provide us the required data for transformation/enrichment.

These input data sources can be anything:

a. Frequent DB updates (via CDC events).
b. Real time data (could be some live sensor information or click stream data or some social network feeds like twitter)
c. Output of another application/component via kafka topics etc.

* In kafka streams there is no concept of batch processing, data is processed as one record at a time.

Irrespective of what data source it is, data should be injected into kafka topics for our streams application to consume the data.

But how and who will push data to the topics ? (take examples a and b above)

There should be some kind of adapters who consume that information from these source systems and feed to topic for processing right?

Well, “Kakfa connect” is one such component which will do it for us with simple configuration. So, assume it as an intermediate component which will connect data source to streams application for processing.

(Now, it feels like Kafka Connect + Kafka streams together will make the wonders right ? and yes, that exactly true).

Kafka connect is all together a separate concept and will be discussed separately.

When the data is pushed to kafka topics, every message/record is associated with key and value (producer will publish the key and value).

If the producer explicitly pushes the record with key, we can access both key and value(actual record) while consuming.But as key is not a mandatory parameter to be sent , so if producer does not explicitly specify it, default value of key is “null”

Now, Switching to last component “Output sources”, they are the consumers of your streams application. These could be kafka sink connector again connected to a DB source or simple kafka topic or some down streams application etc..

Streams Application (Heart of Streaming part):-

It is a processing component that transforms incoming stream record by record.

These stream of records (stream of data from now on, will be referred as stream of records) are immutable and can be replayed at any time.

Once the processor transforms data, it will be pushed to kafka topic again which can be consumed by our down stream applications.

Now, what is this processor, how to build it and how does it transform the incoming data ?

Well, to keep it simple, assume processing as a sequence of steps the incoming stream of data will be put through, in order to enrich the data.

The process of preparing this step by step process and building a flow on how data to be transformed is termed as “building a topology”.

Here Topology can include a multiple processors. See below figure:-

Before we go deep into processing details, it is important to understand certain properties. One of them is Serde configuration (key serde and value serde)

Serde stands for serialization and deserialization of data. This helps kafka to understand how to translate the data into bytes and vice-versa

We have to set the default key serde and value serde in the properties so that the stream processor will understand how to consume and act on the data.

Some example Serdes:- String serde, Long Serde, your specific custom serde (built with some Avro format or Json format etc..)

We will see how to build a topology with code snippet in a moment, but what we actually do as part of transformation and how does it look like?

Here are some examples:-

Example 1:- (Aggregating) when you get some string message in every record, you might want to count/aggregate how many times specific words are being repeated

Say:-
I/p record 1:- “Hello this is kafka streams app”.
O/p :- (Hello 1), (this 1), (is 1), (kafka 1), (streams 1) (app 1)

I/p record 2:- “Learning kafka is exciting”.
O/p :-(Learning 1), (kafka 2), (is 2), (exciting 1)

When you get second record, you are doing an upsert on word count from your existing previous o/p

Example 2:- (Filtering) when you might want to filter values < 100

Say:-
I/p record1 is:- 150.
O/p:- nothing is sent out as filter is not matched.

I/p record2 is:- 10.
O/p:- 10 etc..

Example 3:- (Joining) In above two examples we have seen messages independently processed, but there could also be cases where we might want to join the incoming stream with another stream and combine the data

Say:-
I/p record from stream 1:- (id:10, name:sam, city:hyd).
I/p record from stream 2:- (id:10, amount:100)
O/p record:- (id:10, name:sam, city:hyd, amount:100)

Note:- In order to combine two streams there should be common parameter based on which streams can be joined. Think of it similar to SQL joins

Before we see some basic code, let us know the sequence of steps on how to build a stream processor with some basic operations

It is not mandatory that all the operations mentioned on left side are executed in same sequence. Depends on your use case, few of them might be executed in whichever order you specify.

In above picture only few operations are mentioned. There are also other operations apart from above ones (like peek, filter, filter not, foreach, some aggregator or reduce operation with previous values etc..).

For more documentation on various operations that are supported, refer kafka streams developer guide here

Streams as such is a big topic and there are lot of things to explore which we cannot cover as part of this article.

So far we have seen KStream (all the above concepts that we discussed are done on KStream object which is an abstraction of your record stream).

At this stage, it’s worth to discuss on “KTable”. The main difference with this, when compared to KStream is:-

In KTable each data record represents an update. The value in a data record is interpreted as an update of the last value for the same record key,Where as in KStream it is always interpreted as a new record for transformation.

Lets understand them with an example:-

KStream:-
I/p record:- (sam, 1) → produced (sam, 1) record as output
Assume there is an update on same key and say we got new record (sam, 2), you can do some aggregation and produce (sam,3) record

KTable:-
For I/p record 1:- (sam, 1) → produced (sam, 1)
for next record say, (sam, 2) → previous record is updated with new value received and updated record on KTable will be (sam, 2)

Assume KTable as a simple store where the latest values are updated for corresponding keys (UPSERT operation). But KStream will always append the new record (INSERT operation)

The beauty here is you can mix both of them. As a stream processor, do the required transformation and push the final updates to store in KTable (makes sense ?) (Think of it as you are doing some operations and dumping the updated information on to DB)

Consider below end to end example, where we are trying to check the number of words starting with letter “s” and pushing the final count information on to KTable.

KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> inputMessage = builder.stream(“input-topic”);
KTable<String, Long> counts = inputMessage
.mapValues(eachLine -> eachLine.toLowerCase())
.flatMapValues(eachLine -> Arrays.asList(eachLine.split(“\\W+”)))
.filter((key,value) -> value.startsWith(“s”))
.selectKey((key, word) -> word)
.groupByKey()
.count();

counts.to(Serdes.String(), Serdes.Long(), “output-topic”);
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

Step1:- we have created a stream object and attached it to incoming topic (input-topic).
Step2:- As we want individual processing of record with out any specific joins, we fall under left category of the picture (as shown above one)
Step3:- Input message will be received as a line (Ex:- “Hello, this is sample example”). Key is null, value is line (a big string)
Step4:- Split the total line into different words
Step5:- Filter the words that start with letter “s”
Step6:- select the key as per your need (in this case we want to maintain value against each word. so word itself is our key)
Step7:- Group by Key and count the related keys to check the count of words starting with “s”.

*We mentioned Serdes.String(), Serdes.Long() which means we are publishing our key and value as String type and long type format.This also means that downstream applications should consume this data with same type for processing i.e, Producer Schema (how record is published ) should be matched with Consumer Schema (how record should be consumed)

Hope this helps to get started with writing your own Streams Application.

Happy Learning!!!

--

--