Kafka Streams DSL for Scala: The Basics

Photo by Seb Zurcher on Unsplash

This month, I’m learning Kafka Streams…with a Scala twist.

Most of the Kafka Streams examples you come across on the web are in Java, so I thought I’d write some in Scala.

I decided to start learning Scala seriously at the back end of 2018. I’d written a bit of Scala in the past but wanted to step it up a notch. I have a Java background and my current role involves writing Kotlin, but…I’m still learning Scala so this adds complexity to this months learning.

This isn’t an introduction to the subject of Kafka Streams, this is an introduction to the Streams DSL for Scala, so I won’t be going over theory such as what Kafka Streams is, processor topology, etc. I’ll be covering the basic structure of a Kafka Streams application and some of the more common operations. If you want an introduction, I advise you to buy Kafka Streams in Action by William P. Bejeck Jr. Its an amazing book and I’ve learnt a ton from it.


A basic Kafka Streams application

The Kafka Streams DSL is built on top of another part of Kafka Streams which is the Processor API. Confluent recommend users start their Kafka Streams journey with the DSL. The Processor API is low level were as the DSL allows users to write business logic in a declarative and functional format.

The DSL allows us to perform operations on records with a few lines of code.

Lets get stuck in.

package transactionstream

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.typesafe.config.ConfigFactory
import models.Transaction
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KStream
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import serdes.StreamsSerde

object ExampleApp extends App {

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._

val conf = ConfigFactory.load
implicit val transactionSerde = new StreamsSerde().transactionSerde

val props = new Properties
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getString("bootstrap.servers"))
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, conf.getString("transactions.application.id"))

val builder = new StreamsBuilder

val transactionsKStream = builder.stream[String, Transaction](conf.getString("transactions.application.source.topic"))
.mapValues(t => t.maskCreditCard)
transactionsKStream.to(conf.getString("transactions.application.sink.topic"))

val streams = new KafkaStreams(builder.build(), props)

streams.start

sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}

}

The first thing we do is import org.apache.kafka.streams.scala.ImplicitConversions._ and org.apache.kafka.streams.scala.Serdes._. The reason for this is explained here, it brings primitive SerDes and abstraction methods into scope without having to keep providing them when needed.

Next, we load the configuration.

I use a package called config, which is a configuration library. It allows you to store configuration values in a .conf file in the resources folder and then access those values in your code by first creating a reference to the config file then specifying which property you want the value of.

# Example of a config file
bootstrap.servers="localhost:9092"
transactions.application.id="credit.card.transactions.stream"
transactions.application.source.topic="credit.card.transactions"
transactions.application.sink.topic="credit.card.transactions.masked"
transactions.application.mastercard.sink.topic="credit.card.transactions.mastercard"

Next, we create an instance of the Transaction SerDes.

implicit val transactionSerde = new StreamsSerde().transactionSerde

This is a custom SerDes that I created that takes a instance of a Transaction class and serializes it to a byte array and vice versa. We could have SerDes for all kinds of scenarios, including SerDes for JSON, Avro etc.

What is also noticeable is that the transactionSerde is declared as implicit. The reason for this is so that that we don’t have to specify it when calling the to() method of the KStream class and from() method of the StreamsBuilder class.

Next we build up the Properties. This is were we use the config file I mentioned earlier to set the broker url and the id of the application. I love config files, hard coded values make me feel uneasy.

val props = new Properties
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getString("bootstrap.servers"))
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, conf.getString("transactions.application.id"))

We can configure many more properties in here, but as this is an introduction the two properties above will suffice.

Next we create an instance of the StreamsBuilder class.

val builder = new StreamsBuilder

The StreamsBuilder class provides us with a new Kafka Streams topology. We then are able to attach nodes to this topology. The nodes carry out a particular function such as streaming records from Kafka or performing processing related operations.


Nodes and mapValues

val transactionsKStream = builder.stream[String, Transaction](conf.getString("transactions.application.source.topic"))
.mapValues(t => t.maskCreditCard)

The first node we create is the source node. This node is responsible for consuming records from Kafka for our application to process. We call the stream() method passing in the name of the Topic we want to consume from.

The stream() method also takes an implicit, but due to having imported ImplicitConversions._ and Serdes._ and specifying the Transaction Serdes (transactionSerde) as implicit, we don’t have to pass in a value.

Next we call the mapValues() method. In doing this we are creating another node which is referred to as a processor node. This method transforms the value of each input into a new value, potentially even a new type…but not in our case.

It takes an anonymous function as an argument, with either the key and value types or just the value type as parameters. So here we are passing the current transaction into the function and calling the maskCreditCard() method which takes a credit card number and masks it so only the last 4 digits are visible and returns the transaction object.

Now our application has two nodes. A source node and a processor node. It might not be straight forward to see this from the block of code above.

The code below might help in understanding this concept.

val transactionsKStream = builder.stream[String, Transaction](conf.getString("transactions.application.source.topic"))

val maskedCreditCardStream = transactionsKStream.mapValues(t => t.maskCreditCard)

Here the transactionsKStream is the source node and the maskedCreditCardStream is the processoer node. In the first block of code the operations are chained to make the code for concise.

We are now ready to send records back into Kafka.


to

So now our transactionsKStream contains Transaction objects with a masked credit card number. The object also contains a few other properties such as card type, expiry, amount spent, location of transaction and account number but we aren't going to do anything with these values.

Transaction(
xxxx-xxxx-xxxx-8542,
MASTERCARD,
2024–02–10,
4.07,
12.234254353, -34.35644353,
97872345
)

The Transaction records are now ready to be sent back to Kafka.

transactionsKStream.to(conf.getString("transactions.application.sink.topic"))

We call the to() method specifying the Topic name we want to send the records to. Again, we are getting this value from the configuration file. Calling the to()method generates another node which is referred to as the sink node.

Now we have 3 nodes. A source node that’s responsible for consuming records from Kafka. A processor node that masks the credit card number. Lastly, a sink node which sends the records back into Kafka.

Next we create a Kafka Streams client, passing in the properties object and the instance of the builder we created at the beginning and calling the build() method.

val streams = new KafkaStreams(builder.build(), props)

The build() method returns a Topology which is an acyclic graph of the 3 nodes we have created. Passing the Topology into the KafkaStreams constructor allows us to use the nodes in our application.

We are then able to start our Kafka Streams application…

streams.start

And last but not least we tap into the ShutdownHookThread.

sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}

This allows us to gracefully shutdown the application. If the JVM detects that the application is to shutdown it calls the streams.close()method for us. We give the threads 10 seconds to join up.

If you have Kafka and Zookeeper running you should have trouble running the Kafka Streams application.


print

Now we have a working Kafka Streams application, go you…well done!

The only way to tell (at the current moment in time) is to run a console consumer to check the credit.card.transactions.masked Topic is receiving records.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic credit.card.transactions.masked

There is nothing wrong with this approach, the data wont be readable but we’ll be able to see records being consumed. There is a way we can get our masked credit card stream writing records to stdout so we can see the actual values. We can use the KStream.print() method to accomplish this. The print() method takes an instance of the Printed class. This class provides two static methods, Printed.toSysOut() and Printed.toFile(filePath).

transactionsKStream.print(Printed.toSysOut()))

With this processing node in our topology, we’ll now see records printed out when the application is running.

Transactions :)

Cool! Right, there may be times when we want several print processing nodes in our topology. You can imagine that the stdout will be pretty busy with all the different records being printed out. What we can do is chain the withLabel() method to the print() method, passing in a string value as a header.

transactionsKStream.print(Printed.toSysOut().withLabel("Transaction"))

We’ll now see records printed out with a label when the application is running.

Transactions :)

We can also use peek(). The problem with print is that it changes the type of the KStream it’s called on so we are unable to perform processing operations.

# Return type is Unit so we are unable to add anymore processing nodes.
val transactionsKStream = builder.stream[String, Transaction](conf.getString("transactions.application.source.topic"))
.mapValues(t => t.maskCreditCard)
.print(Printed.toSysOut())

The peek()allows us to perform such as printLn() and carry on adding processing nodes.

val transactionsKStream: Unit = builder.stream[String, Transaction](conf.getString("transactions.application.source.topic"))
.mapValues(t => t.maskCreditCard)
.peek((_,t) => println(t))
.groupBy()
.filter()

filter

There may be times when we need to filter out records that we aren't interested in. We can use the filter() method for this. This method takes one parameter which has to be a predicate. This is a function that returns a Boolean. We can define a predicate in two different ways.

val filteredTransactionKStream = transactionsKStream
.filter((_, t) => t.amount > 100.00)
.print(Printed.toSysOut().withLabel("Filtered Transaction"))

We call the filter() method passing in an anonymous function as an argument which returns records were the amount is over 100.00.

We now have a new KStream with filtered out records.

Filtered Transactions :)

Alternatively, we can create a anonymous function, assign it to a variable and pass that predicate into the filter() method.

val amountOver100 = (_: String, t: Transaction) => t.amount > 100

val filteredTransactionKStream = transactionsKStream
.filter(amountOver100)
.print(Printed.toSysOut().withLabel("Filtered Transaction"))

This is useful if the predicate needs to be used in more than one place.


foreach

For each record that we process, there may be an operation that we want to perform that isn't a part of the DSL. Such as adding a record to a database, sending the record to a 3rd party API, through an enrichment service etc.

For this, we can use the foreach()method.

val visaFilter = (_: String, t: Transaction) => t.cardType == "VISA"

val visaServiceAction = (_:String, t: Transaction) => new VisaService().toDb(t)

val filteredVisaTransactionKStream = transactionsKStream
.filter(visaFilter)
.foreach(visaServiceAction)

Here we’re filtering out transactions were the card type is Visa and for each record we’re passing the transaction into a method which inserts it into a database and prints out the transaction also.

Transactions sent into a DB :)

Alternatively, we could have used anonymous functions again.

val filteredVisaTransactionKStream = transactionsKStream
.filter((_, t) => t.cardType == "VISA")
.foreach((_, t) => new VisaService().toDb(t))

branch

Our stream application isn’t always going to be receiving records, processing them and sending them back into a Topic. There may be instances were the stream needs to branch into several child streams depending on a particular predicate.

We can use the branch() method for this.

val mastercardPredicate = (_: String, t: Transaction) => t.cardType == "MASTERCARD"
val visaPredicate = (_: String, t: Transaction) => t.cardType == "VISA"

val branchedTransactionKStream = transactionsKStream
.branch(
mastercardPredicate,
visaPredicate
)

branchedTransactionKStream(0).to(conf.getString("transactions.application.mastercard.sink.topic"))
branchedTransactionKStream(1).foreach((_, t) => new VisaService().toDb(t))

The branch() method takes a number of predicates and returns an array of KStream instances branched from the original stream based on the supplied predicates.

So here we are creating two predicates, one for the MasterCard type and another for the Visa type. We pass these two predicates into the branch() method.

The branchedTransactionKStream is an array containing two KStreams. branchedTransactionKStream(0) contains records were the type is Mastercard and branchedTransactionKStream(1) contains records were the type is Visa.

We then can perform processing operations as usual with the newly created streams.

Alternatively, we could have used anonymous functions again instead of creating predicates and assigning them to variables.

val branchedTransactionKStream = transactionsKStream
.branch(
(_: String, t: Transaction) => t.cardType == "MASTERCARD",
(_: String, t: Transaction) => t.cardType == "VISA"
)

branchedTransactionKStream(0).to("transactions.application.mastercard.sink.topic")
branchedTransactionKStream(1).foreach((_, t) => new VisaService().toDb(t))

Wrapping up

In this article I’ve covered a basic Kafka Streams application and 5 useful methods, mapValues(), print(), filter(), foreach() and branch().

There are many more methods available and they are documented here.

Many thanks as always for reading my articles, it’s really appreciated. Any thoughts, comments or questions drop me a tweet.

Cheers 👍🏻

Danny

https://twitter.com/danieljameskay