For the impatient, here is the link to our Github repository:
With today’s adoption and success of event-based data processing systems, there is a wide range of opportunities to gain insight into a business in real time. A message broker such as Apache Kafka is a candidate of choice for these systems. Besides implementing a message queue, Apache Kafka also offers a light-weight solution for stream processing called Kafka Streams.
In this blog post, we introduce how Kafka Streams can be used to process real time data and expose queryable APIs. You can find the corresponding code in our GitHub project. The examples are based on the real world data set LFM-1b.
Consider the following example of a music streaming service: Every time a user listens to a song, an event is emitted and written to a Kafka topic. These listening events may have the following structure.
In such a music streaming service, we may want to analyze general metrics such as the overall count of events, the last time the user listened to a song, or the user’s top ten artists. But eventually, we want to do more than analyzing metrics. It might be interesting to leverage the events to provide recommendations or detect anomalous behavior. Hence, there is a need for a service that processes the stream of events, exposes calculated results through an API, and is easily extendable. Fortunately, Kafka Streams provides all the tools to create such a service.
The corresponding architecture is illustrated in the diagram below:
event-source contains all listening events. There is a standalone Kafka Streams application for each use case such as analyzing metrics or providing recommendations. As we explain in our blog post Queryable Kafka Topics with Kafka Streams, Kafka Streams allows us to expose processed records via a REST API in real-time. We use an additional REST API to unify the individual application’s APIs and create a combined user profile.
Next, we dive into different use cases, note challenges, and show how Kafka Streams helps solving them.
Collecting user metrics in distributed applications
Let’s revisit the example mentioned above: We want to create a queryable user profile store created in real time in a streaming fashion with the following metrics:
- Number of plays
- First listening event
- Last listening event
- Top 10 artists
- Top 10 albums
- Top 10 tracks
Computing the number of plays as well as the first and last listening event is straight forward. On an incoming listening event, the application increments the current number of plays. Additionally, it compares the event’s timestamp with the current first and last event’s timestamp respectively and updates them accordingly.
Generating the current top tens requires more work. We explain our approach using the example of tracks. Calculating this metric for albums and artists is analogous.
First, we use Kafka Streams’ DSL to count all tracks for each user. We map to a new key that contains the user and track id and group by it. Afterwards, the built-in
count method is called. We could try to use these counts to update the user’s top ten tracks. However, revisiting how Kafka Streams scales shows why this does not work. Each application with the same application id processes a topic’s messages for certain partitions. The count method triggers a repartitioning based on the new key. Thus, it is not guaranteed that the state store contains the queried user profile and returns null.
With that in mind, we map the count again to a new key-value pair consisting of the user id and a composite value based on the track id and the current count. This can be safely processed because the stream of counts and the user profile share the same key type and subsequently the partitioning.
Computing recommendations in real time using SALSA
Recommender systems are an integral part of modern web services. They aim at personalizing a business’ product and increase user engagement. In our example, we can recommend tracks, albums, and artists to a user. Well suited recommendations may increase the time a user spends on our platform.
We adopt Twitter’s GraphJet to create recommendations. GraphJet generates them by performing a random walk over a bipartite graph. Each GraphJet node holds all current data to efficiently do so. Again, we consider the example of tracks to explain our implementation.
The first step is to create a bipartite graph. There are two sets of nodes: users and tracks. An edge between a user and a track represents a listening event.
The second step is to compute the recommendations for a given user using the SALSA algorithm. The starting point is the user’s node. We sample an interaction edge and walk to the corresponding track. From there, we again sample an edge and walk back to the associated user. We do this a configurable number of times. Additionally, multiple such random walks are performed. During these random walks, we count the number of times each track is visited. The algorithm recommends those tracks that are visited most often ranked by their count.
To go back and forth, we need to be able to efficiently find the tracks a specific user listened to and the users that listened to a specific track. In Kafka Streams, we create two state stores, which act as the indices for the respective directions. Hence, each store maps a user or track id respectively to a list of all adjacent nodes.
As mentioned, each node in GraphJet contains all data. Contrary, in Kafka Streams, an applications’ instance processes only a fraction of the data. Instances that share an application id form a consumer group. Kafka assigns each consumer in a group a distinct set of partitions. To circumvent this behaviour, we concatenate a UUID to the id.
Until now, the user profile, as well as the recommendations contain the ids of the artists, albums, and tracks respectively. For users, however, it is more interesting to see artist names or track titles. Inherently, this data is relatively static. Kafka Streams offers
GlobalKTable to represent this kind of data. We store the names in a
GlobalKTable respectively and join the tables with the stream to add the names. This comes with the advantages that all data is available in every application node and no repartition is necessary.
The following code shows how to perform a join for the updated track’s counts as described above:
First, we create a
GlobalKTable that reads all track names from an input topic and stores them. Then, we perform the join. The first argument is the
GlobalKTable. The second argument defines which part of the stream’s key-value pairs should be used for the join. We use the ids from the tracks we count. Lastly, the third argument defines the result of the join.
A unified REST-API
So far, the service consists of two sub-applications. They expose different aspects of a user through their individual API. Next, we create a unified API with a combined user profile.
Furthermore, this unified API helps mitigating drawbacks of the single applications. If, e.g., the user profile application receives a request for a specific user, the application must look up the user’s partition before processing the request. This may require forwarding the request to another instance of this application that is responsible for the respective partition. In the unified API, we can preemptively calculate the partition and therefore directly send the request to the correct host.
The default calculation of an id’s partition can be found in Kafka’s DefaultPartitioner. The following code snippet shows our implementation.
We also require a mapping of partitions to their corresponding hosts. The method
allMetadataForStore of the
KafkaStreams class provides exactly this information. We expose it through an additional API. The unified REST service calls this API and updates its current mapping to forward the request to the right host.
This is not necessary for applications like the recommendation. Here, every node can process all requests because it has all the data. However, this introduces a load balancing problem. The unified REST service needs to know all currently available recommendation applications. As we described, we set a unique application id for each recommendation deployment. Consequently, the approach to call an API that exposes the information returned by
allMetadataForStore doesn’t work. We use Kubernetes’ LoadBalancer to solve this difficulty. The different instances of the recommendation application are part of the same replica set and therefore share a load balancer, which the unified API uses to forward requests to.
Altogether, querying our REST-API results in something similar to the following extract:
Overall, we count 4000 events. In this timeframe, the queried user apparently greatly enjoyed Lindsey Stirling’s music. Apart from that, we can see a tendency to British rock music. Our artist recommendation reflect this by recommending Frank Turner, The Fratellis and David Bowie.
In this blog post, we described an extendable user profile store based on Kafka Streams. We use several standalone applications to process the stream of user events and collect vastly different types of information. They range from simple counts over aggregations to more advanced calculations like recommendations. An API combines them into a single user profile that is updated in real time. Kafka Streams’ queryable state stores are a great fit for such an application because of their lightweight and easily scalable nature.