Building a Real-Time Leaderboard with Kafka Connect and KSQL

Avinash Bhardwaj
Team Pratilipi
Published in
7 min readJul 22, 2020

Pratilipi is the largest Indian language storytelling platform. Pratilipi is currently (July 2020) home to 250,000+ writers and 21M+ monthly active readers across 12 languages. Our team handles 1.5B+ requests per day spread across 40+ microservices with daily ingestion of 50GB data and 100k+ API requests per second. Operating at this scale while providing the best features to our users comes with a lot of difficult technical challenges. One such challenge is to provide real-time statistics to our authors. In this blog, we will lay out the issues and try to solve the core technical problem.

Modeling Mutability by Immutable Events

Kafka by definition has immutable events, i.e. the event can be removed from Kafka but their value can never be changed by any means. Therefore, Kafka can accurately model immutable entities like click-streams, page-views, app-logs, etc. but modeling mutable entities like followers of a user, likes on a post, comments on a post, ratings of an app, state of a user can be a bit tricky. We could use Ksql tables that use compacted Kafka topics behind the scenes to maintain the latest state of any entity. However, for a lot of use-cases, delineated before, only having the final state might not fulfill the requirements. For example, if the requirement was to track a user’s activity on a website, more specifically, what pages the user has visited, Ksql table won’t suffice as it would only store the final website the user visited.

A part of this problem is solved by Kafka Connect where the connectors, for e.g. Debezium’s MySQL source connector, can be configured to send the before and after states of a row of a table, essentially providing us with the info of what changed. Kafka Source Connector is one of the many implementations of Change Data Capture (generally called CDC). We can extend this idea and represent the mutable entities with immutable events.

The basic premise of this argument is that if the entity is mutable, the mutation itself is immutable. In other words, if a state of an entity changes, that change can be captured as an event because that change is absolute. For eg. if the attribute x of an entity changes it’s value from v1 to v2 and then v2 to v3, the changes can be captured as three consecutive events, where each event reflects the net-result of the mutation.

Events with mutations

Let look at a real use-case at Pratilipi in order to get a better grasp of the idea. The Pratilipi app has a leaderboard feature for authors that basically ranks users based on their statistics like total reads on the author’s contents, total followers of the author, average reviews on their contents, etc. Let’s solve for one aspect of the problem, which is getting the average reviews of the contents of the author.

Getting the average rating of content is supposedly straightforward. The events will contain the ratings and we just need to aggregate those events for average. If the review events received in order are 4, 5, 2, 5, 4 then the average rating is 4. This implementation doesn’t account for the fact that a review on content is an entity that is mutable. A user is capable of updating or removing the review. So if the user, who gave a rating of 2 on the content, changes it to 4 then according to the current implementation there will be another event of rating 4. The new average will be 4 when it should be 4.4.

Let’s look at how we tackled this problem.

Implementation

In the following sections, we will be building our system as described in the diagram.

Overview of the system

This solution uses ksql v5.3.2. Since there have been newer versions, please refer to the docs to get the latest syntax. That being said, the core concepts remain the same.

1. Kafka Connect and Debezium

First, we need to get raw CDC events from our database. We can use Debezium’s MySQL source connector for that purpose. We can configure the connector to extract the before and after states of a row in a table. Get those states is a key aspect for this implementation to work as we discussed above.

For the sake of brevity, I won’t go into detail about how to set up Kafka Connect and add the source connector to the database. I would encourage you to go through the documentation of the connector go get a better understanding of the connector. Here is a sample configuration for our purpose.

The connector will start producing CDC events into the topics respective of their tables. For instance, reviews the table’s events will be pushed to the topic mysql.server.platform.reviews. A sample event is shown below.

a sample CDC event

2. Create Streams and Tables with Ksql

We need to transform raw CDC topics to create streams and tables and key them with the proper identifiers. This can be done by running the following commands on the Ksql server via a cli or with a file.

CDC topic transformations

3. Create Delta Stream from the Review Stream

The following command creates a delta stream out of the reviews stream

The above delta stream will produce events in the format below.

>SELECT REVIEWID,CONTENT_ID,REVIEW_DELTA,COUNT_DELTA FROM REVIEW_DELTA;4914 | 13782 |  5  | 1 |// When the review is updated from 5 to 2, the new event will be

4914 | 13782 | -3 | 0 |

4. Aggregating the review deltas to get the net average rating of a content

Now we need to create an aggregated table over the delta stream to get the net average review of contents.

create average review table

The table will produce the following events as they happen.

> SELECT CONTENT_ID,REVIEW_SUM,REVIEW_COUNT,AVG_RATING FROM AVG_REVIEW;16006 | 226   | 97  | 4.329896907216495
6754 | 244 | 93 | 3.6236559139784945
26522 | 243 | 101 | 3.405940594059406
62078 | 209 | 84 | 4.488095238095238
35058 | 264 | 99 | 4.6666666666666665
51448 | 272 | 108 | 4.5185185185185186

5. (Optional) Enrich events with Content Meta-Data

The enriched table will produce the following events as they happen.

> SELECT CONTENT_ID,AUTHOR_ID,CONTENT_NAME,AVG_RATING FROM AVG_REVIEW_WITH_CONTENT_META;64285 | 8152 | A Christmas Carol     | 87  | 4.4597701149425286
98583 | 9227 | A Scandal in Bohemia | 101 | 4.504950495049505
26522 | 2417 | Sense and Sensibility | 101 | 3.405940594059406
34487 | 6876 | Ode to the West Wind | 109 | 4.4403669724770642
35058 | 3739 | The Blue Umbrella | 99 | 4.6666666666666665
51448 | 7128 | What is Man? | 108 | 3.5185185185185186
62078 | 8152 | A Tale of Two Cities | 84 | 4.488095238095238
83168 | 9227 | The Valley of Fear | 103 | 3.504854368932039

6. Getting the leaderboard of the authors

Now when we have the average reviews of the contents, we need to get the average of the avg_review for each authorId . We could do this aggregation inside ksql itself but that would add another level of complexity over the solution. It is better to offload the events onto a traditional database like MySQL or MongoDB by either Kafka Connect’s sink connectors or a consumer directly reading from the AVG_REVIEW topic. We can then easily get the top authors by querying that database. Also, we will have a more general system that can be re-used for other use-cases.

Either of the solutions is out of the scope of this post but I would again encourage you to explore how you can use the aggregated table topic to serve your use-cases.

Possible Improvements

  • We can add other parameters of the author to get a fairer picture of the leaderboard. The other parameters can be the total number of followers, the total number of views on their contents, the total number of comments on their contents, etc.
  • Add more states to each delta event. For eg. if the user is able to delete their review, we would have a state field in our source database for each review which basically tells us if a review is ACTIVE or INACTIVE. The delta values will accommodate this additional state.

Ending Notes

The method of modeling a mutable entity with Kafka’s immutable events opens a plethora of possibilities. We have seen how we can leverage the power of stateful aggregations in Ksql and generate real-time results. I would appreciate any feedback on this post and would love to hear your thoughts on this topic.

If you are excited about solving similar problems, Pratilipi is looking for people like you to build the platform for the next billion users. You can find opportunities here.

You can also reach out to me here.

--

--

Avinash Bhardwaj
Team Pratilipi

Senior Software Engineer at Apna | Loves Distributed Systems