Sailing through Kafka Streams

Data Affair
SFU Professional Computer Science

--

This blog is written and maintained by students in the Master of Science in Professional Computer Science Program at Simon Fraser University as part of their course credit. To learn more about this unique program, please visit {sfu.ca/computing/mpcs}.

Ever since the adoption of Cloud services and IoT devices, there has been an exponential increase in the number of micro-services, leading to complex communication between devices. Real-time streaming data, like generation of logs by each cloud service and events from IoT devices, and its real time processing has become mainstream in our tech savvy world. There have been numerous attempts in developing solutions to handle mobility of vast volumes of data; and one of those technologies which is built with event streaming at its core is Apache Kafka.

Wait, we’ve heard of Kafka. What is Kafka Streams?

Apache Kafka, built by LinkedIn, has become the go-to technology used for developing real-time messaging as well as streaming services. It is highly scalable, robust and has a distributed system design, making it highly efficient for huge workloads. Kafka uses the Producer-Consumer approach, where the producers are the entities from where data is generated (IoT Devices and Logs from Cloud Computers) and consumers are the target entities which are deemed to receive the data (Relational / NoSQL Databases, Data Warehouses and Real-time dashboards).

Apache Kafka Streams is a client library that helps build real time data processing applications and microservices that take advantage of Kafka clusters to store input and output data. So it’s basically an API that lets you write processing logic between consumers (from a topic) and producers (to a topic) or a writer to other external systems (like a database).

Why use Kafka Streams?

Alright, we know real time streaming systems can be built to produce data and put it into a Kafka topic in a broker using producers and consume the data using consumers. To make our lives simpler, we can use the boiler plate provided by Kafka itself called Kafka Connect to interface with various data sources. Everything seems good so far, but when performing filtering or aggregations, we need to consume the data from Kafka brokers and do the required operations in our own programming language that we used to interface with Kafka and then push it to the required topic for later consumption or visualizations . Here we must handle the code required to complete this process including the various overheads like exception handling and writing efficient code capable of being production-worthy. There is a lot of interoperability boilerplate that we must write when there are multiple applications required, which eventually reduces our productivity. Phew! We didn’t want that to happen.

Now let’s enumerate the advantages of Kafka Streams,

  • Provides robust event-at-a-time processing with millisecond latency
  • Streaming data with Kafka streams is elastic and can be scaled at any point of time by increasing or decreasing the number of instances subscribing to the brokers for data. Note that each instance should have the same consumer id.
  • Avoids a lot of boilerplate and allows us to code production-grade code in a very concise manner.
  • Can handle sudden peak ingestion of massive loads of data in a reliable way and ensures fault tolerance through replication.
  • Kafka streams provide an option for cluster encryption and authentication with authenticated/encrypted clients. The data in-transit can be encrypted between the application and the Kafka brokers.
  • Applications can be deployed in client systems, containers, VM or even the cloud using the Streams API.
  • It is a simple client library — there is no need of additional cluster managers.

Architecture of Kafka Streams

Kafka Streams Architecture constitutes of:

# ’n’ number of input streams incoming from multiple sources with different topics

# Multiple nodes running the logic of performing operations over the streamed data

# ‘m’ number of output streams to different topics.

The ’n’ and ‘m’ need not be the same number. For any number of streams coming from different sources, we can transform the data before it reaches the destination, limit the outgoing streams and even branch one stream into multiple streams for multiple types of destination.

The nodes running the logic have stream partitions and stream tasks as the main logical units. Based on the input stream partitions which are just subsets of the input, fixed stream tasks are created for each partition, which execute the logic to be performed. These stream partitions/tasks determine the parallelism of the Kafka Streams application.

The stream tasks have a state-store inside them. This is created in case of windowing the data stream or performing aggregation operations over the data. These state stores can be in the form of in-memory Hash-Map or RocksDB Database. Each instance/node can have multiple stream tasks and since the state-stores copy the data for operations, they are synchronized to produce a collective result. This could be accessed for only local machines or across the whole application.

On specifying memory allocation for each stream task, it is used for caching and compacting data records before writing to the state store. This cache helps in reducing time of writing to and from nodes for downstream communication. It also reduces overhead time for reading data from state stores in case of performing operations.

Stream Processing Topology

The Processor Topology / Stream Topology is used to define the workflow of stream processing. These topologies can be defined using Kafka Streams Domain Specific Language (DSL). The topology is represented by a Directed Acyclic Graph (DAG).

All the nodes in this DAG are called stream processors. Each stream processor is necessarily representing the streaming tasks on nodes. The edges by which they are connected represent the streams or the shared state-stores between the tasks.

There are two special stream processors: Source Processor and Sink Processor. Source Processor’s role is to collect events from several topics and pass it ahead to the stream processors. The Sink Processor sends the event streams to the specified topics received from the upstream processors.

Operations with Streams

Now that we’ve had an overview of Kafka Streams, let’s try and understand its various components and possible operations through an example. We move forward considering a source of financial data that streams the company Symbol as the key and the Indian share market payload (Stock) as the value into a topic ‘market-data’ in json format.

We will now explore how various methods in the Streams API can be used to perform a number of functions like splitting this json payload into individual topics based on their index (can be “nifty” or “sensex”), calculating the moving average of the volume or value of each company over a 15-minute window and unifying the latest nifty and sensex values for each company.

Serialization and Deserialization (Serdes):

Kafka provides built in Serializers/Deserializers for few data types, like String, Long, and Byte. To implement our own Serializer/Deserializer, we must implement the Serializer and Deserializer interfaces. A sample of a custom Serializer and Deserializer can be found in the Github repository. We override three methods inherited from the interfaces:

1. Configure: Defines the configuration specifications.

2. Serialize/Deserialize: Defines the process of serializing/deserializing the data.

3. Close: Terminates the (de)serialization session.

There are few variations of Predefined Kafka Serdes other than the primitive data types:

1. Avro: Predefined Serdes for the Apache Avro format

2. Protobuf: Serdes for referenced schemas; schema referring to other schemas.

3. JSON Schema: Serdes for JsonNode format or Java Class

To create a custom Serdes for our JSON payload, we construct a Java class to represent our fields and define a Serdes as follows:

This is a declaration of how an instance of class Stock be serialized and deserialized to and from bytes when transmission is involved.

Components of Kafka Streams

To use the Streams API, we define an instance of the StreamBuilder class. All logic is implemented with this instance. The configurations required for connecting and basic operations are also to be provided to this instance before the stream is started.

KStreams

Specific to DSL only, KStreams is a form of logs ledger, with ‘APPEND’ as the only possible operation as no rows are overwritten. It’s an immutable list of records where each record is an event containing an unbounded set of data. There can be multiple records for a key, uniquely identified by timestamp. Examples include transaction history, logs entry from cloud consoles.

We create a KStream from the source topic:

The KStream contains records of type ‘String’ as key and ‘Stock’ as value from the input topic — market-data. The Serdes to be used by the Streams API when building the KStream is also passed. This creates a KStream of all the stock records as value and the company symbol as the key.

KTables

KTable is a type of updated stream which maintains the latest records for each key, i.e., rows are overwritten, and values updated based on their timestamp. ‘UPSERT’ is the only possible operation here. This includes a variation of UPSERT operation, where if the null is the value for a record, then it is considered as a ‘DELETE’ operation, removing the record from the ledger.

A KTable can be created with the Builder’s table method. The generics and the arguments are very similar to that of the KStream.

We will be using this KTable at a later stage of our streams application.

GlobalKTables

GlobalKTables is a variation of KTables. As the name suggests, it’s a collective collection of updated records across the application i.e., from all running instances. The main advantage of having a GlobalKTable is sharing state across all the available instances at any given time with more efficient join operations. On the other hand, there is an increase in communication in the Kafka Application internal network and more storage consumption.

A GlobalKTable is created with the Builder’s globalTable() method and has a similar syntax to that of KTable.

Streams Transformations

The Streams API provides us abundant transformation functionalities for different operations that can be categorized into:

  1. Stateless transformations
  2. Stateful transformations

Stateless transformations

These types of transformations do not require an associated state to be persisted in the streams processor. Some examples for such transformations are:

Filter: Data can be filtered in real time based on a predicate on the keys and the values.

We don’t want junk data that creeps in which doesn’t have the company symbol in its key. This filters out all records that do not have a proper key.

Map and Flat map: The map() and flatMap() methods allows operating on records one at a time. A map() operation produces one output record in the output stream for one input record whereas the flatMap() produces one or more records.

These methods can change the key as well, marking the stream for repartitioning of the stream. So for operations that only require transformation of the values, the streams API provides mapValues() and flatMapValues().

Branching: The data in the ‘market-data’ topic contains both ‘sensex’ and ‘nifty’. It would be beneficial to split the respective payloads to individual topics.

branch() splits the input based on the predicates. The resulting KStreams of nifty and sensex are stored in a map with keys based on the named and branched arguments.

Group-By: With all sensex events in one stream, it is possible to calculate aggregated values like average value of each company.

Looks familiar to Hadoop or Spark? It performs a very similar functions, all values pertaining to a particular key are grouped together and can be aggregated upon.

There is also a groupBy() method to group by another key — basically a selectKey() operation followed by groupByKey().

Stateful transformations

For stateful transformations, we need a state store associated with the stream processor to process the inputs and outputs. Kafka Streams are fault tolerant and whenever a processing fails, it guarantees to restore the state as it was before the processing started.

Aggregations

GroupedKStreams can be operated upon to calculate aggregated values per key. We can perform aggregations like reduce() and count() which are very similar to same operations in Spark and Java 8 Streams.

We can also aggregate values with a custom aggregator method. Now that we have grouped the sensex data for each company, we can aggregate the stock value and the volume.

Aggregate allows us to return a different type of data unlike reduce. The resultant KTable has aggregated the total value and volume for each company’s sensex events. We have further mapped the values to store the average of the fields.

While these aggregations were performed for all records of the company, we can also apply windows to each of these aggregate functions. What if we only wanted the 15 minute-rolling average price of stock?

Windowing

This is an example for a sliding time interval. Kafka Streams provides another overlapping hopping window (where the next windows is a duration’s hop from the current). There are also fixed non overlapping windows based on time (tumbling window) and session.

Image Credits: kafka.apache.org

Joins

Kafka Streams offers join operations to achieve multiple requirements. There are three types of joins that can be performed, it includes,

  1. Inner Join
  2. Left Join
  3. Outer Join(Full Join)

Joins can be performed among KStream, KTable and GlobalKTable in the following combinations:

These join operations are done on the keys of the different components and are very similar in behavior with the joins in SQL. Since the joins are performed on the key, it is necessary that the KStreams or KTables involved are co-partitioned, i.e., inputs with the same key in either components must reside in the same partition for the joins to occur.

KTable-KTable

In our example, we can write the 2 KStreams nifty and sensex with the original unaggregated data into two topics — “sensex-events” and “nifty-events”. Now, it is possible to combine the sensex and nifty data to see the latest values of each company.

This is a symmetric KTable-KTable inner join, where a lookup is performed when a new record flows into either KTable involved in the join. A KTable-KTable join is always materialized into a state-store.

KStream-KStream

KStreams can be joined based on a common key to give a resultant KStream.

A KStream would contain multiple records for the same key. Since KStream is stateless, a join operation here would require a scan of all available data for a new data point that enters the stream. To perform this scan efficiently, the join operation would maintain an internal state and the API enforces a time window to be specified so that this buffer would not grow limitless. The arrival of an input on either stream will trigger a join computation.

KStream-KTable

A KStream can be joined with a KTable to return another KStream. For every record in the KStream a lookup is performed on the KTable for the common key. Joins on a KStream with a KTable are purely asymmetrical — the computation is triggered only when a new record flows into the left. The main applications of this join are in scenarios where an input record has to be enriched with additional information in real time before sent for further processing down the data pipeline.

All join operations help combine data from multiple sources in real time to give a complete picture.

Terminal Operations

Once the data is processed by our application, the stream can be terminated by transmitting the data to the desired destination.

Writing to a topic is a terminal operation. Here, all streams are stopped and the records are written to the respective topics passed in the arguments. We also have other terminal operations like print() — to print the records to std out — and forEach() — to perform other actions like writing to a file.

Real-time updating of data in the destinations allows for various applications like dashboards and analytics to help making decisions instantly. Covering all operations of Kafka Stream and its code is beyond the scope of a single blog! Having said that we have the template for a single workflow described in our example with the stock market data and you can find the complete code here on GitHub: https://github.com/niranjan-ramesh/sharemarket-streaming-sample.

We have also written a sample python script to display real-time updates in a simple line graph. 😉

Sample stock value of AirBNB over time from ‘sensex’ topic

Conclusion

We have talked about Kafka Streams, its architecture and how we can use the Streams API to perform various operations on real time data. We hope we were able to introduce the concepts and their implementation clearly. The advantages presented by Kafka Streams helped Kafka grow from being a Pub/Sub model into a powerful real time streaming platform. The capabilities are exciting and with the advent of organizations like Confluent giving their flavors of Kafka, these are waters we don’t mind drowning in.

Thank you for reading the blog! We would love to lend an ear to your valuable feedback in the comment section.

References

  1. Introducing Kafka Streams: https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/#:~:text=Kafka%20Streams%20is%20a%20library,is%20distributed%20and%20fault%2Dtolerant.
  2. Kafka Streams DSL Documentation: https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html
  3. Crossing the Streams — Joins in Apache Kafka: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
  4. Kafka Streams 101 by Confluent: https://youtube.com/playlist?list=PLa7VYi0yPIH35IrbJ7Y0U2YLrR9u4QO-s

Authors

Niranjan Ramesh
Dharunkumar Natarajan
Hariish Nandakumar
Hiren Bangani

--

--