Learn Stream Processing With Kafka Streams

Stateless operations

Abhishek Gupta
Mar 5 · 6 min read
Photo by Joao Branco on Unsplash

Kafka Streams is a Java library for developing stream-processing applications on top of Apache Kafka. This is the first in a series of articles on Kafka Streams and its APIs.

This is not a theoretical guide about Kafka Streams (although I’ve covered some of those aspects in the past).

In this part, we’ll cover stateless operations in the Kafka Streams DSL API — specifically, the functions available in KStream, such as filter, map, groupBy, etc. The DSL API in Kafka Streams offers a powerful, functional-style programming model to define stream-processing topologies.

The APIs (KStream, etc.) referenced in this post can be found in the Kafka Streams Javadocs


The Setup

Set the required configuration for your Kafka streams app:

Properties config = new Properties();config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

We can then build a topology that defines the processing pipeline (the rest of this article will focus on the stateless parts of a topology).

You can create the KafkaStreams instance and start processing:

KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever

Stateless Operations Using KStream

Let’s dig in!

filter

For example, if the value sent to a topic contains a word and you want to include words greater than a specified length, you can define this criteria using a Predicate and pass it to the filter method — this will create a new KStream instance with the filtered records.

KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.length() > 5;
}
})

It’s also possible to use filterNot if you want to exclude records based on certain criteria. Here is a lambda-style example:

KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));

map

This is available in multiple flavors, such as map, mapValues, flatMap, and flatMapValues.

Simply use the map method if you want to alter both the key and the value. For example, to convert the key and the value to uppercase.

stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
}
});

Use mapValues if all you want to alter is the value:

stream.mapValues(value -> value.toUpperCase());

flatMap is similar to map, but it allows you to return multiple records (KeyValues):

stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
})

In the above example, each record in the stream gets flatMapped such that each comma-separated value (CSV) is first split into its constituents, and a KeyValue pair is created for each part of the CSV string. For example, if you have the records (foo <-> a,b,c) and (bar <-> d,e) (where foo and bar are keys), the resulting stream will have five entries: (foo,a), (foo,b), (foo,c), (bar,d), and (bar,e).

Use flatMapValues if you only want to accept a value from the stream and return a collection of values


group

We’ll cover stateful operations on KGroupedStream in subsequent articles in this series.

Here’s an example of how you can do this using groupByKey:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);

KGroupedStream<String,String> kgs = stream.groupByKey();

A generalized version of groupByKey is groupBy, which gives you the ability to group based on a different key using a KeyValueMapper.

stream.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
});

In both cases (groupByKey and groupBy), if you need to use a different Serde (Serializer and Deserializer) instead of the default ones, use the overloaded version (which accepts a Grouped object).

stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));

Terminal Operations

You can use the to method to store the records of a KStream to a topic in Kafka.

KStream<String, String> stream = builder.stream("words");stream.mapValues(value -> value.toUpperCase())
.to("uppercase-words");

An overloaded version of to allows you to specify a Produced object to customize the Serdes and the partitioner.

stream.mapValues(value -> value.toUpperCase())
.to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));

Instead of specifying a static topic name, you can make use of a TopicNameExtractor and include any custom logic to choose a specific topic in a dynamic fashion

stream.mapValues(value -> value.toUpperCase())
.to(new TopicNameExtractor<String, String>() {
@Override
public String extract(String k, String v, RecordContext rc) {
return rc.topic()+"_uppercase";
}
});

In this example, we make use of the RecordContext, which contains the metadata of the record, to get the topic and then append _uppercase to it.

In all of the above cases, the sink topic should pre-exist in Kafka.

If you want to log the KStream records (for debugging purposes), use the print method. It accepts an instance of Printed to configure the behavior.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());

This will print out the records — e.g., if you pass in (foo, BAR) and (john, DOE) to the input topic, they’ll get converted to uppercase and logged as such:

[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE

You can also use Printed.toFile (instead of toSysOut) to target a specific file.

The foreach method is similar to print and peek:

  • It’s also a terminal operation (like print)
  • And it accepts a ForeachAction (like peek)

Miscellaneous

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.peek((k,v) -> System.out.println("key="+k+", value="+v))
.to(OUTPUT_TOPIC);

In the above example, you’ll be able to see the key and values being logged, and they’ll also be materialized to the output topic (unlike the print operation).

branch is a method I haven’t used (to be honest), but it looks quite interesting. It gives you the ability evaluate every record in a KStream against multiple criteria (represented by a Predicate) and output multiple (an array of) KStreams. The key here is you can use multiple predicates instead of a single one — as is the case with filter and filterNot.

You can merge two KStreams together into a single one.

StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
stream1.merge(stream2).to("output-topic");

Note: The resulting stream may not have all the records in order.

If you want to derive a new key (it can have a different type as well) for each record in your KStream, use the selectKey method, which accepts a KeyValueMapper. selectKey is similar to map, but the difference is that map restricts the return type to a KeyValue object.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
})

While developing your processing pipelines with Kafka Streams DSL, you’ll find yourself pushing resulting stream records to an output topic using to and then creating a new stream from that (output) topic:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);
//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);
//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);

This can be simplified by using the through method. So you can rewrite the above as follows:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.through(OUTPUT_TOPIC)
.filter((k,v) -> v.length > 5)
.to(LENGTHY_WORDS_TOPIC);

Here, we materialize the records (with upper-case values) to an intermediate topic and continue processing (using filter, in this case) and finally store postfiltration results in another topic.

That’s it for now. Stay tuned for upcoming articles in this series!


References

Better Programming

Advice for programmers.

Abhishek Gupta

Written by

Currently working with Kafka, Databases, Azure, Kubernetes and related open source projects (Developer Advocate at Microsoft)

Better Programming

Advice for programmers.

More From Medium

More from Better Programming

More from Better Programming

More from Better Programming

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade