Most trivial use cases involving Akka streams with Kafka include consuming from a topic and processing each Kafka message exactly or at-least once. In these use cases, Kafka takes care of the partition offset bookkeeping based on your consumer id. That’s great, right? But what if you have a use case in which you want to stream only the last 5 minutes of a topic’s data to the user? In this case, the internal offset bookkeeping will not help you.
In this article, we’ll show you how to utilise Akka Streams Kafka’s subscriptions to achieve streaming the last 5 minutes from a topic.
Shaping the use case
I’ll build the streaming application in the context of a bond-trading application. Assume that we have a Kafka topic called
prices with 10 partitions. The topic will contain messages which represent the price for a specific bond at a specific time. The data model is as follows:
The data model contains an
isin identifier which stands for International Securities Identification Number. This is a worldwide unique identifier used for financial products. The messages are being published to Kafka and keyed by the
For the application to function properly, we have a set of requirements defined:
We want to listen to all the data on the topic
At its core, each Kafka topic is made up of different partitions. These partitions are essential to the ability to distribute data and scale-out. Our stream implementation should listen to all of the partitions available on the topic to ensure that we do not miss out on any price updates.
We need to preserve the processing order within a partition to ensure that the consumer doesn’t get out-of-order messages
As we mentioned before, the price updates are being produced with the
isin field as the key. This will ensure that prices are being distributed over the available partitions. Each message with the same key will go to the same Kafka partition thus ensuring that when we read from a partition, we always get all messages related to the same key in the right order
We want to parallelise processing across partitions
We want our stream to be as efficient as possible and thus we want to take advantage of the ability to process each partition in parallel. By keying the messages we guarantee that we can process each partition in parallel without getting out-of-order messages.
We want to stop processing if any of the streams fail
Since we’ll be processing each partition in parallel streams, it could happen that 1 of them fails. To keep things simple, we’ll opt for restarting everything when this happens. Instead, you could add additional logic to handle single failures.
Diving into the implementation
Retrieving the partitions for a topic
To accomplish our stream, we’ll be relying on Akka Stream Kafka’s
Subscriptions. It provides a way to retrieve a subscription to a topic’s partition offset based on a timestamp in the method
* Manually assign given topics and partitions with timestamps
def assignmentOffsetsForTimes(tp: TopicPartition, timestamp: Long): ManualSubscription =
assignmentOffsetsForTimes(Map(tp -> timestamp))
The method requires us to provide a
TopicPartition so let’s first start off with retrieving all the partitions for our topic. To do so, we first need to instantiate a
val kafkaConsumer = system.actorOf(KafkaConsumerActor.props(ConsumerSettings(...))
With the consumer actor available, defining a utility class that retrieves the partitions for a topic is trivial:
Setting up the stream source and flow
Now that we have the
PartitionInfo for each partition in the topic, we can construct the Akka Stream sources based on the partitions. For each partition, we first want to set up a subscription just for that partition that will resume from the desired timestamp position. We then construct the stream Source based on that subscription.
In the snippet above, you can see that we keep the sources in a
SortedMap in which the key is a
Partition. This Partition is a simple case class which supports ordering:
Make it flow
Up until now, we’ve been working on setting up Akka Stream sources that start from the partition offset closest to our desired time-window of 5 minutes. Now that we have the
kafkaSources Map containing the sources, the next thing that we want to do is set up a Flow which parses the message to JSON and on success, calls a publisher method which will take care of further processing:
In the above snippet, we’re using Circe to decode the
ConsumerRecord[_, Array[Byte]] to our
Price case class. For the sake of the blog post, we do not properly handle decoding failures. If decoding succeeds, we call the
publish function argument.
Going back to the snippet in which we constructed the
SortedMap of Kafka sources, we can now hook up the Sources to our flow:
We first create an instance of the flow which we assign to
publisher. You can see that the
publish function argument in our case sends a message to an Akka actor using a parallelism of 1. This is so that we ensure that there’s only a single update to our actor in-flight at each time.
We then map on all Kafka sources that we have, and for each source, we run the source through the publisher. Last but not least, we map the result to a
(Partition, RecordPosition) tuple.
Combining all the sources
At this point, we have a set of Sources that maintain information on the partition and position within each source. We also hooked up each source to a flow which processes every message flowing through.
As we mentioned in the requirements, we want to fail the stream if any of the partition streams fail. To achieve this, we’re going to combine all the separate streams into 1 stream
The most important part to notice in the snippet above is the
eagerComplete which we set to true. This will make sure that the resulting stream completes when one of the input streams complete.
In this article, we’ve shown how you can move beyond the trivial Akka Stream Kafka examples by using
Subscriptions to only retrieve the last 5 minutes of data from a particular topic.
we’ve created a sample project which demonstrates how this all ties together. If you have any questions or difficulties, do not hesitate to reach out!