EXPEDIA GROUP TECHNOLOGY — DATA

Stateful Joins with the Kafka Streams Processor API

Or how I learned to not fear the processor API

Chris Stromberger
Expedia Group Technology

--

Artistic photograph of a staircase and atrium in contrasting colors
Bill & Melinda Gates Computer Science Complex, University of Texas at Austin. Photo by the author.

My team, Expedia Group™ Commerce Data, needed to join events coming in on two (and more in the future) Kafka topics to provide a realtime stream view of our bookings. This is a pretty standard requirement, but our team was not very experienced with Kafka Streams, and we had a few wrinkles that made going with an “out of the box” Kafka Streams join less attractive than dropping down to the Processor API.

What we needed, in a nutshell, was to:

  • Join two or more events,
  • Repartition one event to extract the proper join key,
  • Report on unjoined events,
  • Possibly purge orphaned events to a dead letter topic,
  • Configurable no set join window (for expiration of unjoined events),
  • Oh, and with Kafka Streams newbies at the helm.

Processor API vs DSL

There are two approaches to writing a Kafka Streams application:

Developers prefer the DSL for most situations because it simplifies some common use cases and lets you accomplish a lot with very little code. But you sacrifice some control when using the DSL. There’s a certain amount of magic going on under the covers that's hidden by the KStream and KTable abstractions. And the out-of-the-box joins available between these abstractions may not fit all use cases.

The most common way I see the DSL characterized is as “expressive,” which just means “hides lots of stuff from you.” Sometimes explicit is better. And for some (like me), the “raw” Processor API just seems to fit my brain better than the DSL abstractions.

Don’t fear the Processor API

Most documentation I found around Kafka Streams leans towards using the DSL (Confluent docs state “it is recommended for most users”), but the Processor API has a much simpler interface than the DSL in many respects. You still build a stream topology, but you only use Source nodes (to read from Kafka topics), Sink nodes (to write to Kafka topics), and Processor nodes (to do stuff to Kafka events flowing through your topology). Plus the DSL is built on top of the Processor API, so if it’s good enough for the DSL, it should be good enough for our humble project (in fact, as a Confluent engineer says, “the DSL compiles down to the Processor API”).

Processor nodes have to implementProcessor, which has a process method you override which takes the key and the value of the event that is traversing your Kafka Streams topology. Processors also have access to a ProcessorContext object which contains useful information on the current event being processed (like what topic & partition it was consumed from) and a forward method that is used to send the event to a downstream node in your topology.

To illustrate the difference, here’s a comparison of doing a repartition on a stream in the DSL and the Processor API.

DSL Repartition

KStream repartitionedStream = stream.selectKey((k, v) -> {
return <<build some new key>>;});

That’s pretty simple. For example, the above will create a new key for the event, and will also repartition it to a magically-created internal topic if the new key is needed later in the topology for a join.

You can also specify the new repartition topic explicitly in the DSL with through:

KStream repartitionedStream = 
stream.selectKey((k, v) -> {
return <<build some new key>>;})
.through("repartitioned-topic-name");

In this case, the new topic repartitioned-topic-name has to be created manually. Both of these DSL repartition options are very succinct.

Processor API repartition

To repartition using the Processor API, you explicitly add the required nodes to your topology — add a sink node for writing to the repartitioned topic, and a source node to then read back from the repartitioned topic, then downstream nodes in the topology can use the re-keyed event. You also have to manually create the repartition topic.

Topology topology = ....    // add node to re-key the  original event
.addProcessor("repartition-node",
repartitionProcessor,
parentNodeName)
// publish re-keyed event to a repartition topic
.addSink("repartition-sink-node",
"repartitioned-topic-name",
"repartition-node")

// read from repartition topic
.addSource("repartitioned-source-node",
"repartitioned-topic-name")

... // continue with topology

To implement the repartition logic (like it happens with selectKey in the DSL), you would add the new key-setting code in the RepartitionProcessor specified in the addProcessor call as follows:

@Override
public void process(EventKey key, EventValue value){

// Create new key
final EventKey newKey = <<build some new key>>;

// Send event downstream--new key + original value in our case
context.forward(newKey, value);
}

So it’s slightly more verbose overall, but it’s also more explicit and cleaner (to me) than the in-place coding functional style of the DSL version.

Topology

Here’s the topology we are working with. I created these diagrams using an open source tool called kafka-streams-viz. You just need to enter the output from your Topology.describe() method to generate a diagram of your topology.

Graphical flow showing the flow of events in join and repartition topics

I have detailed the repartition step above. Below are the remaining steps in the topology. In our case we only needed to repartition one incoming stream, as the other was already using the key we wanted.

State store processor

These are labeled “left-processor” and “right-processor” in the above topology diagram. This node is responsible for storing the event into its corresponding state store. We’re using the built-in RocksDB store, which is a simple KeyValueStore backed by a compacted Kafka topic for fault-tolerance.

@Override
public void process(EventKey key, EventValue value) {
// Put into state store.
keyValueStore.put(key, value);
// Send downstream. Join processor just needs the key,
// so don't need to send any value.
context().forward(key, null);
}

The join processor then just checks if the current key has a record in all the state stores — we can only join once we have an event in every state store with the same key. If so, do the join and clean up the state stores. If the key is not present yet in all state stores, we’re done. We will just wait for all the other events for that particular key to arrive.

@Override
// Implement the join.
public void process(EventKey key, Object value) {

// See if we have the matching record in all state stores.
if(leftKeyValueStore.get(key) != null &&
rightKeyValueStore.get(key) != null) {

// Do join logic here...
JoinedValue joinedValue = <<create new join event>>;

// Delete unneeded joined events from their state stores.
leftKeyValueStore.delete(key);
rightKeyValueStore.delete(key);
// Send joined event to downstream sink.
context().forward(key, joinedValue);
}
}

Joining more than two streams

If you need to add another stream to the join, you just duplicate the processor that stores the incoming event into its own state store and then add another check in the join processor against this new state store.

Punctuate to report on state stores

Another cool feature of the Processor API is something called a Punctuator. This is essentially a cron job that can inspect your state stores (and also allows you to perform other operations that you might want on it) without interfering with event processing. For example, you can use this to report on unjoined stale events sitting in the state stores or to expire those unjoined events after some configurable time sitting in the state store and forward them downstream to a dead letter topic sink (we’re doing both in our project).

Summary

You can do all of this stuff in the DSL as well, by “dropping down” into the Processor API from your DSL code where needed, but in this post, I just wanted to highlight how a join application might look using only the Processor API. The Processor API is more explicit than using the DSL and encourages separating the topology definition code from the implementation code, which to me results in cleaner code and makes it simpler to envision the events flowing through the nodes in your topology. Give it a whirl in your next Kafka Streams project!

Learn more about technology at Expedia Group

--

--