Building a Data-Driven application with Golang and Kafka — Personalization

Mohammad Hoseini Rad
14 min readJun 6, 2023

Nowadays, every application tries to personalize its feed based on your interests. You may have seen whenever you search for something on youtube, after a few minutes, your timeline is filled with videos related to your search. Let’s build one of them! In this article, we will create a simple replica of Twitter where every user has a timeline and, based on their interaction with tweets, we find their taste to show them more of them!

Building a Basic Twitter

First, we must build a simple Twitter. For the sake of this tutorial, I won’t be using any persistent database, and I’ll focus on Kafka and Redis instead. Thus, I’ll save tweets in a Redis instance instead of a persistent database.

type Redis[T models.Keyer] struct {
rdb *redis.Client
}

func NewRedis[T models.Keyer](rdb *redis.Client) Redis[T] {
r := Redis[T]{rdb: rdb}
return r
}

func (r Redis[T]) Save(ctx context.Context, k T) error {
b, _ := json.Marshal(k)
return r.rdb.Set(ctx, k.Key(), b, 0).Err()
}
func (r Redis[T]) Get(ctx context.Context, key string) (T, error) {
var t T
b, err := r.rdb.Get(ctx, key).Bytes()
if err != nil {
return t, err
}
json.Unmarshal(b, &t)
return t, nil
}

I made a simple Redis wrapper for easier serialization. It uses Golang’s generics. As you can see, I defined a new interface called Keyer.

type Keyer interface {
Key() string
}
type Tweet struct {
UID string `json:"UID"`
Author string `json:"author"`
Tweet string `json:"tweet"`
}

func (t Tweet) Key() string {
return "tweet:" + t.UID
}

I used a simple implementation and didn’t even implement authentication and authorization. We focus on Kafka and Redis in this article.

Now with a simple HTTP router, we can run the simplest tweeter replica!

func main() {

rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

tweetService := services.NewSaveTweet(repositories.NewRedis[models.Tweet](rdb))

e := echo.New()
e.POST("/tweet", func(c echo.Context) error {
content := c.Request().PostFormValue("tweet")
author := c.Request().PostFormValue("author")
tweet := models.Tweet{Tweet: content, Author: author}
tweet, err := tweetService.Save(c.Request().Context(), tweet)
if err != nil {
return c.String(500, err.Error())
}
return c.String(201, tweet.UID)
})

e.GET("/tweet/:uid", func(c echo.Context) error {
uid := c.Param("uid")
tweet, err := tweetService.Get(c.Request().Context(), uid)
if errors.Is(err, redis.Nil) {
return c.String(404, "tweet not found")
} else if err != nil {
return c.String(500, err.Error())
}
return c.String(200, tweet.Author+" : "+tweet.Tweet)

})
e.Logger.Fatal(e.Start(":1323"))
}

Let’s test it.

As you can see, it works properly. However, we’ve missed the most important aspect of Twitter! the timeline!

Say we have a follower service that saves the following relation between users. I made this simple mock:

type Follower struct {
followers map[string][]string
}

func NewFollower() Follower {
return Follower{
followers: map[string][]string{
"mohammad": []string{"john", "maria", "hanna"},
"john": []string{"hanna"},
},
}
}

func (f Follower) Followers(ctx context.Context, user string) ([]string, error) {
return f.followers[user], nil
}

As you can see, everything is hardcoded. we can save the following relations within any database. at the moment, our concern is the timeline.

For the timeline, we use a list in Redis. every user has a timeline:uid key in the Redis, a list of tweet ids.

type Timeline struct {
rdb *redis.Client
}

func NewTimeline(rdb *redis.Client) Timeline {
return Timeline{rdb: rdb}
}

func (t Timeline) Push(ctx context.Context, user string, tweet ...interface{}) error {
return t.rdb.RPush(ctx, "timeline:"+user, tweet...).Err()
}

func (t Timeline) Latest(ctx context.Context, user string, count int64) ([]string, error) {
return t.rdb.LRange(ctx, "timeline:"+user, -1*count, -1).Result()
}

Filling the timeline with a naive approach

Now, how can we fill the timeline? the most naive approach is pushing tweets into each follower’s timeline. Here is the code for the reference:


timelineService := services.NewTimeline(rdb)
followerService := services.NewFollower()
tweetService := services.NewSaveTweet(repositories.NewRedis[models.Tweet](rdb))
...
e.POST("/tweet", func(c echo.Context) error {
ctx := c.Request().Context()
content := c.Request().PostFormValue("tweet")
author := c.Request().PostFormValue("author")
tweet := models.Tweet{Tweet: content, Author: author}
tweet, err := tweetService.Save(ctx, tweet)
// pushing the tweet into the followers timeline.
followers, _ := followerService.Followers(ctx, author)
for _, follower := range followers {
if err := timelineService.Push(ctx, follower, tweet.UID); err != nil {
return c.String(500, err.Error())
}
}
if err != nil {
return c.String(500, err.Error())
}
return c.String(201, tweet.UID)
})

I made a simple route to retrieve the timeline:

e.GET("/timeline/:user", func(c echo.Context) error {
ctx := c.Request().Context()
user := c.Param("user")
tweetIDs, err := timelineService.Latest(ctx, user, 10)
if errors.Is(err, redis.Nil) {
return c.String(404, "timeline not found")
} else if err != nil {
return c.String(500, err.Error())
}
tweets, err := tweetService.MGet(ctx, tweetIDs...)
if err != nil {
return c.String(500, err.Error())
}
timeline := ""
for i := len(tweets) - 1; i >= 0; i-- {
tweet := tweets[i]
timeline += fmt.Sprintf("%s: %s\n________________\n", tweet.Author, tweet.Tweet)
}
return c.String(200, timeline)
})

As you can see, I’ve added MGet support to the Redis wrapper and the Tweet service.

func (r Redis[T]) MGet(ctx context.Context, key ...string) ([]T, error) {
bb, err := r.rdb.MGet(ctx, key...).Result()
if err != nil {
return nil, err
}
result := make([]T, len(key))
for i, b := range bb {
json.Unmarshal([]byte(b.(string)), &result[i])
}
return result, nil
}
func (st SaveTweet) MGet(ctx context.Context, uid ...string) ([]models.Tweet, error) {
ids := make([]string, len(uid))
for i, s := range uid {
ids[i] = "tweet:" + s
}
return st.r.MGet(ctx, ids...)
}

What is MGet? as you can see by using the latest method of timeline service we recieve the ids instead of the tweet content. with MGet we can retrieve all of those tweets with one request that significantly impact the preformance of the application.

Let’s test it.

It’s working fine for now! @Cristiano has 100M followers on Twitter. What would happen if he used our platform to tweet? well, it would crash! but let’s be deep on why and how it would crash.

  • Retrieving all 100M followers would affect the database significantly.
  • As you might have considered, a for loop on 100M items would interrupt the system for quite a while.
  • What would happen after we pushed the tweet into 53M timelines and the system crashed? How are we going to continue this process?

Let’s replicate the problem. I’ll increase the number of followers to see how it impacts the system:

  • For 3 followers took 2.9ms.
  • For 100 followers took 83.7ms.
  • For 1000 followers took 809.6603ms.
  • For 10,000 followers, it took 6.3s.

You can notice the trend. For 100 million followers, it would take around 22 hours. Honestly, the problem right now is mainly because we send individual requests to Redis instead of the pipeline. However, we cannot expect CR7 to wait 10 minutes so we can distribute the tweet to his followers! We cannot scale this system! The system has no resiliency! We have only one node, and that's a hazard for fault tolerance.

Let’s introduce Kafka to the application.

Right now, the structure of the application is something like this:

As you can see, everything within our system is interconnected. What would happen if we wanted to introduce a Machine Learning service that executes an algorithm to determine the user's taste?

Ok, it seems to be pretty simple and straightforward. But we introduce complications whenever we decide to scale the application. Now say we’ve gained five times more users and need to scale the application. What would happen?

I’ve used my bad painting skill and bold lines to exaggerate the complication. However, as you can see, it’s a mess! Now imagine what would happen If we needed to introduce another ML algorithm for any other reason.

Let’s use Kafka to decouple producers and consumers.

What is the purpose of the Go instance? authorization, authentication, some business logic, and validation. We can define a Kafka topic for new tweets and write another system that consumes it to save new tweets in a database.

As you can see, producers and consumers do not know each other and have no connection. You may think, for this specific example, it’s an overkill. So, let’s introduce some ML algorithms.

Now let’s introduce another ML instance that monitors tweets to check whether they follow the guidelines.

As you can see, We can easily scale every part of our system without introducing any complications. Let’s add the last ML algorithm and find the users’ tastes based on their clicks. We need to create a new topic for clicks.

I intentionally moved the Analytics services upward so that you can see how much easier it is to decouple services.

In the end, I should mention that each of these colored circles is a different consumer group that allows us to scale efficiently within Kafka.

Let’s create the NewTweets topic.

If you want to skip the configuration and jump to using Kafka, you can use Upstash’s free Kafka service.

to run Kafka locally, you can use this docker-compose file:

version: '3'

services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Then, connect to the Kafka instance:

docker exec -it kafka bash
root@da9985351043:/# cd /opt/kafka/bin/
root@da9985351043:/opt/kafka/bin# ls
connect-distributed.sh kafka-dump-log.sh kafka-storage.sh
connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh
connect-standalone.sh kafka-leader-election.sh kafka-topics.sh
kafka-acls.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh
kafka-broker-api-versions.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh
kafka-cluster.sh kafka-mirror-maker.sh trogdor.sh
kafka-configs.sh kafka-preferred-replica-election.sh windows
kafka-console-consumer.sh kafka-producer-perf-test.sh zookeeper-security-migration.sh
kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-server-start.sh
kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-stop.sh
kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-shell.sh
kafka-delegation-tokens.sh kafka-server-start.sh
kafka-delete-records.sh kafka-server-stop.sh

Kafka provides helpful bash tools. However, right now, we need to create a topic.

kafka-topics.sh --zookeeper zookeeper:2181 --create --topic twitter.newTweets --replication-factor 1 --partitions 10

What is ZooKeeper? What does the replica factor mean? What are partitions? to be honest, they are basics of Kafka, and if I wanted to cover all of them, It would take the whole article. However, these are basic explanations:

  • ZooKeeper: an application that communicates directly with Kafka nodes, assigns Node leaders in a cluster, and ensures they are alive. In general, it manages Kafka nodes in a distributed environment.
  • Replication factor: To ensure resiliency, Kafka defines a node as the topic leader and copies its data among other Kafka nodes. With this option, we can define how many copies of this topic we need.
  • Partitions: Well, this one can be a little tricky. A partition is the smallest method of concurrency. Within a consumer group, no two consumers can read off a partition.

Kafka clients in Go are a mess! However, I found “github.com/segmentio/kafka-go” the easiest library to start. However, I’ve been using the confluent-kafka-go library in production, and it works just fine but utilizes CGo.

package mq
package mq

import (
"context"
"encoding/json"
"github.com/segmentio/kafka-go"
)

type Writer[T any] struct {
w *kafka.Writer
}

func NewWriter[T any](addr, topic string) (Writer[T], func() error) {
w := &kafka.Writer{
Addr: kafka.TCP(addr),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
return Writer[T]{w: w}, w.Close
}

func (w *Writer[T]) WriteBatch(ctx context.Context, items ...T) error {
messages := make([]kafka.Message, len(items))
for i, item := range items {
b, _ := json.Marshal(item) // using a naive approach for serialization
messages[i] = kafka.Message{
Value: b,
}
}
return w.w.WriteMessages(ctx, messages...)
}

First, I made this simple wrapper over kafka-go library. As you can see, I used a naive approach for serialization. Kafka doesn’t serialize the data, and it just accepts bytes. These bytes can be a Blob, a JSON, or ProtoBuf. That’s the job of the consumer and producer to understand the serialization method. In production, it’s better to decouple it from the producer and consumer wrappers.

e.POST("/tweet", func(c echo.Context) error {
ctx := c.Request().Context()
content := c.Request().PostFormValue("tweet")
author := c.Request().PostFormValue("author")
tweet := models.Tweet{UID: shortid.MustGenerate(), Tweet: content, Author: author}
err := writer.WriteBatch(ctx, tweet)
if err != nil {
return c.String(500, err.Error())
}
return c.String(201, tweet.UID)
})

As you can see, I’ve assigned the Tweet Id beforehand. Well, in a distributed system where millions of data flow around the application, the old-school sequential ids are incompatible. Twitter announced SnowFlake back in 2010 to overcome this problem. For the sake of this tutorial, I use a simple UID generator.

Let’s test it. You can consume from a topic with the kafka-console-consume.sh tool.

./kafka-console-consumer.sh - bootstrap-server 127.0.0.1:9092 - topic twitter.newTweets

Now if I post a new tweet, I will consume a message like:

{"UID":"oOjVldlVg","author":"mohammad","tweet":"Hey. This is John."}

Now we need a job that consumes off the newTweets topic and saves them.

package mq

import (
"context"
"encoding/json"
"github.com/segmentio/kafka-go"
)

type Reader[T any] struct {
r *kafka.Reader
onError func(item T)
}

func NewReader[T any](addr, topic, group string, onError func(T)) (Reader[T], func() error) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{addr},
GroupID: group,
Topic: topic,
})
return Reader[T]{r: r, onError: onError}, r.Close
}

func (r Reader[T]) Read(handler func(items T) error) error {
for {
message, err := r.r.FetchMessage(context.TODO())
if err != nil {
return err
}
var t T
json.Unmarshal(message.Value, &t)
err = handler(t)
if err != nil {
r.onError(t)
}
r.r.CommitMessages(context.TODO(), message)
}
}

First, I made a simple reader. I used the FetchMessage to prevent auto-commits. As you can see, I defined callback support whenever the system faced an error while processing the message. We can create a new topic for error handling or other approaches. For the sake of this tutorial, I just resubmitted those messages into the newTweets topic.

func main() {

writer, closeWriter := mq.NewWriter[models.Tweet]("127.0.0.1:9092", "twitter.newTweets")
defer closeWriter()
reader, closeReader := mq.NewReader[models.Tweet]("127.0.0.1:9092", "twitter.newTweets", "saver", func(tweet models.Tweet) {
// retry process
fmt.Println("error, retrying ...")
writer.WriteBatch(context.TODO(), tweet)
})
defer closeReader()

go reader.Read(func(items models.Tweet) error {
fmt.Println("received a message: ", items.Tweet)
if rand.Intn(100) > 50 {
return errors.New("a random error")
}
return nil
})

exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)

<-exit
fmt.Println("Closing Kafka connections ...")

}

As you can see, I haven’t implemented the saving mechanism yet. We want to see how the error handling works. For half of the requests, I raise an error. The other thing I should mention is the graceful shutdown. In Kafka, consumers are important, and Kafka checks their heartbeats to see whether they’re working or not. If you don’t shut down gracefully, it will cause a significant delay within your topic whenever a consumer goes down.

received a message for  Hello World.
error, retrying ...
received a message for Hello World.
error, retrying ...
received a message for Hello World.

Ok, as you can see, after 2 retries, we got no error, and it worked properly. there are several approaches to handling Kafka consumer errors. But for the sake of this article, we leave it here.

go reader.Read(func(items models.Tweet) error {
return tweetSaverService.Save(context.TODO(), items)
})

Now by changing this line of the code, we can save the tweets.

Be aware of the LAG! Well, by using Kafka, you won’t face the errors immediatly. There is a lag between submiting the tweet and saving it in the database and updating cache. YOU MUST MONITOR THIS DELAY! I can’t emphesize enough on it. I’ve seen systems for some basic bad configuration had around 1 minutes lags that could’ve been less than 5 seconds with a better partitioning. To monitor this delay, you can add a timestamp to your messages and whenever a message is completed, add the delat time to a Prometheus metric.

Let’s analyze the tweets.

The second part of the system is tagging the tweets. (By tag, I don’t mean hashtags. By tagging tweets, we can inform the system of the content of the tweet, and It can be used to define the user’s taste)

We won’t get deep into machine learning. I’ll use ChatGPT for tagging the tweets. First, let’s generate 5 random tweets:

First, I asked ChatGPT to define the 50 most common tags:

I want to write a system that tags tweets and understands the users' tastes based on those tags, suggest me 50 tags in snake case format:


1. movie_lovers
2. sports_fans
3. music_lovers
4. fashion_lovers
5. foodies
6. gamers
7. wanderlust
8. tech_savvy
9. bookworms
10. drama_lovers
11. art_lovers
12. finance_fanatics
13. social_media_junkies
14. culture_vultures
15. pet_lovers
16. family_oriented
17. health_enthusiasts
18. history_lovers
19. DIY_crafts
20. photography_fanatics
21. outdoors_adventurers
22. car_lovers
23. green_living
24. nature_lovers
25. fashion_followers
26. culture_aficionados
27. travel_bugs
28. comic_book_buffs
29. eco-friendly
30. cycling_enthusiasts
31. vegan_advocates
32. music_aficionados
33. eco_warriors
34. entrepreneurs
35. movie_buffs
36. adventure_seekers
37. tech_geeks
38. TV_show_lovers
39. fitness_freaks
40. outdoor_enthusiasts
41. gamers_tribe
42. wine_connoisseurs
43. home_decor_lovers
44. sports_enthusiasts
45. social_media_geeks
46. comic_lovers
47. food_fanatics
48. plant_lovers
49. interior_design_lovers
50. nature_aficionados

Then, I used these tags to create this prompt:

Select 5 relative tags to the content of this tweet. Only use the provided tags.

Tags: movie_lovers | sports_fans | music_lovers | fashion_lovers | foodies | gamers | wanderlust | tech_savvy | bookworms | drama_lovers | art_lovers | finance_fanatics | social_media_junkies | culture_vultures | pet_lovers | family_oriented | health_enthusiasts | history_lovers | DIY_crafts | photography_fanatics | outdoors_adventurers | car_lovers | green_living | nature_lovers | fashion_followers | culture_aficionados | travel_bugs | comic_book_buffs | eco-friendly | cycling_enthusiasts | vegan_advocates | music_aficionados | eco_warriors | entrepreneurs | movie_buffs | adventure_seekers | tech_geeks | TV_show_lovers | fitness_freaks | outdoor_enthusiasts | gamers_tribe | wine_connoisseurs | home_decor_lovers | sports_enthusiasts | social_media_geeks | comic_lovers | food_fanatics | plant_lovers | interior_design_lovers | nature_aficionados

Tweet: Just finished a new novel. Can't wait to talk about it with my book club #nerdalert

Selected Tags:

The output was: bookworms | drama_lovers | culture_vultures | social_media_junkies | book_club which is good enough.

func main() {
openaiClient := openai.NewClient("")
reader, closeReader := mq.NewReader[models.Tweet]("127.0.0.1:9092", "twitter.newTweets", "analyzer", func(tweet models.Tweet) {
fmt.Println("error analyzing ", tweet.Tweet)
})
defer closeReader()

go reader.Read(func(items models.Tweet) error {
resp, err := openaiClient.CreateCompletion(
context.Background(),
openai.CompletionRequest{
Model: openai.GPT3TextDavinci003,
Prompt: fmt.Sprintf(`Select 5 relative tags to the content of this tweet. Only use the provided tags.

Tags: movie_lovers | sports_fans | music_lovers | fashion_lovers | foodies | gamers | wanderlust | tech_savvy | bookworms | drama_lovers | art_lovers | finance_fanatics | social_media_junkies | culture_vultures | pet_lovers | family_oriented | health_enthusiasts | history_lovers | DIY_crafts | photography_fanatics | outdoors_adventurers | car_lovers | green_living | nature_lovers | fashion_followers | culture_aficionados | travel_bugs | comic_book_buffs | eco-friendly | cycling_enthusiasts | vegan_advocates | music_aficionados | eco_warriors | entrepreneurs | movie_buffs | adventure_seekers | tech_geeks | TV_show_lovers | fitness_freaks | outdoor_enthusiasts | gamers_tribe | wine_connoisseurs | home_decor_lovers | sports_enthusiasts | social_media_geeks | comic_lovers | food_fanatics | plant_lovers | interior_design_lovers | nature_aficionados

Tweet: %s

Selected Tags:
`, items.Tweet),
},
)
if err != nil {
return err
}
fmt.Println(strings.TrimSpace(resp.Choices[0].Text))
return nil
})

exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM)

<-exit
fmt.Println("Closing Kafka connections ...")

}

Made this new consumer, and as you can see, the consumer group differs from the last one because we don’t want the same offset for the saver and analyzer. The second thing you should notice is that the previous approach for handling errors is incompatible with multiple consumer groups. why? because if a tweet fails to be saved in the database, we resubmit it to the topic. However, it might have been analyzed property, and we don’t want to reanalyze it.

Let’s test it. for this tweet, “Just had an epic chill session with my squad listening to some of our favorite tunes #goodvibesonly” I got “music_lovers | social_media_junkies | goodvib” which is good enough.

What should we do next?

We can either save those tags in this job or create a new topic and publish them into it. The benefit of the latter is that if other teams or services need this stream of data can consume it.

Now we can continue this architecture and create a new topic for clicks, and by aggregating click and tweet data, create a personalized experience.

Conclusion

Despite all difficulties, using Kafka is fun. looking at problems with a new approach is fun. Reaching a point in your career where you see that old-school ways are not relevant anymore is fun! Don’t be scared of it.

Kafka is not the only option. In my experience, working with RabbitMQ can be straightforward, especially while implementing fan out and pub/sub, and in other cases, utilizing Kafka can save a significant amount of time and energy while scaling.

--

--

Mohammad Hoseini Rad

I've been working as a software engineer for 5 years. I love Go, Python, and building rebust and scalable apps with Kafka, Redis, and Kubernetes.