Complex Systems: Top N commenters for a post — Part 1

Rajat Kanti Bhattacharjee
csmadeeasy
Published in
17 min readJun 23, 2022
Something like but with User Profile Images of everyone doing this

Fare warning this will be a 3 part article. I will try as much as possible to make the use case non-standard and fairly ridiculous. Ridiculous in terms of features simply because they do not make sense for a user and definitely do not make sense as a computational cost.

Problem Statement

Today’s candidate is the Top N users writing Comment Indicator. Think of it as the message writing indicator in your personal chatbox, when the other person writes a text message. Only in this case do you see this indicator for every post in your feed. We would also like to see user profile photo bubbles indicating who are top commenters/engagers etc. You know because of reasons !!! We can perhaps start with a much simpler use case of only showing a typing indicator when a user visits the respective post URL. But this only simplifies the producer front of the problem. The consumer issues still remain. I will also use the constraints that I recently faced in my interviews which were about cross-continental usage of the application. So we are talking about large network delays and the necessity of partition tolerance in some way (data centres can go down left and write across countries stopping the feed, cloudflare anyone)

Requirements

Yes please add a few more requirements. why not !!!

Let’s start with pinning down feature requirements

  • When the user loads a post URL, irrespective of whether a user owns that post should be able to see if someone is writing a comment to it.
  • Be able to show user icons for top users commenting at a time.
  • Feedback should be as real-time as possible.
  • Extend the same to reflect for every post visible on the feed

Scale factors to consider

  • The backend should be able to handle close to 10M+ posts and 5M+ users. Should scale as the number grows

Problem breakdown

  • The problem requirement so far does not seems to require any sort of permanent store. This means we can probably get away with a queue and some cache layers at places. So let’s keep that in mind. No perma store is needed. Although we can assume that all posts and user information are legal information provided from the frontend application
  • Typing information is a keydown event that can be stored and processed on the producer front of the overall system.
  • Users viewing their feed or the post will be listening to the above-produced event for any respective post url. In other words a user is interested in the keydown event stream for a postId
  • We can assume for optimal performance of page serves and requests, each region of concern has their own dedicated servers in the nearest data centres. So this would mean the nearest place to listen for the event stream will be the data centre hosting a supposed replica of the main application.
  • The same can be assumed on the event produce side as well. This itself gives us two parts of the problem. Fan in event aggregation and Fan out consumption.
  • We also need to consider that both producer and consumer end of service is in a completely different country. It won’t be ideal to actually flood the network by sending all click events from every source to every sink. The source and sink must advertise where they want to listen to or send the event.
  • We also need to take into account that any of the services at any point can go down. So the system has to be fault-tolerant.
  • The user applications can either choose to poll or establish a constant HTTP alive connection. Which can then be used by the server to push down server-side events.

Architecture

Before I begin

Hi all Senior / Tech Lead / Architects etc

Pardon my inexperience with such systems and relentless over-engineering. The entire point of the post is to keep reasoning about the system to its every nook and corner. So pardon me

Humbly
A random engineer on the internet

Me trying to create large scale architecture with my limited experience

Producer End

So let’s nail down the basic information we can send in the event package

event =< key= postid , payload = { userId , timeStamp}>

The key is important here since we are going to distribute the workload of aggregating information on post to multiple different workers. The timestamp is important if we want to do any kind of aggregation on a window. The user will be relevant in the end-user front for provisioning some form of UX if needed. We are also keeping in mind that there can be future possibilities of creating aggregation or other metrics on top of users who are actively engaging with posts.

high level consumer diagram

Components

  • User: Signifies the user-facing components of the application. Since they are the ones processing filtering and finally sending the request to the queue.
  • Queue (Q): Can be native Kafka deployments or any other abstraction on top. That guarantees low latency deliveries but not necessarily message delivery reliability (missing key down/clicks in between is fine)
  • Worker(W): We need a message consumer that can aggregate the data and create entries and send messages somewhere else specific to post. The outbound messages can look like

post_event = <key = postId , data = eventType, interactingIds = [] , aggregatedMeta = {}>

The aggregatedMeta information is basically anything we would like to track for a time range. Some examples of

  • In a stream of comments were there any prominent interactors or maybe a spam bot. You don’t want spam bots.
  • We can sort top commenters to identify and only show relevant information for post viewers. Like you may want to see if any high-value user is engaging with a post. Might make the post worthwhile for you.
  • Metrics around users themselves, how frequently they commented or interacted etc.

Estimates on Load

Suppose at any time frame our system gets close 3M users active. The average word per minute for people on mobile devices is 38wpm. The average word length in English is 5. So which means we can estimate on average we may receive

3M * (38 wpm * 5) / 60 = 9.1 M events/s.

Now that’s a huge number. Although a single Kafka cluster should be able to handle this since load balancing for partition happens on the consumer front so requests are always sent directly to the concerned VMs entertaining a particular partition. A little bit on kafka internals for the same.

At this point, I want to emphasize two things

  • We are assuming the worst here. If we want to build a notifier for typing We can send the events on a rising edge of the start of a word. This itself will reduce to 3M * 38/60 = 1.9M events/s
  • The parallelization of which queue to send a request to can be done by any other queue software. But since we need recovery support as well Kafka looks like a good choice. Although open to the suggestion of using other software as well in comments 😃

So let’s start discussing the worker aggregator first and we will gradually move towards explaining the queue component

Component Breakdown: Worker :

So someone familiar with Kafka streams, would have already figured out the architecture and have already headed to the comment section to burn down the over-engineered post.

For the rest of the folks here, let’s avoid Kafka streams for now and keep things quite barebones.

Breakdown of the worker

A worker effectively will be acting as a map store. It receives entry and it writes changes to a map. The map operations can be following for every postId key

  • Post Typing Indicator boolean, nails the basic requirement
  • map of userId to count and should help keep track of top n commenters for a time frame. For every key postId key entry
  • Meta information like sum, min and max engagement metrics etc can be also be aggregated per post.

Now let’s talk about eviction. I mean we are aggregating the data for a continuous stream but we don’t really want that information from the start of the world time itself right? So we need a sliding window across time. Something to keep track of all the events happening between the current time and events that happened N minutes before. This will allow us to basically create an eviction policy that can reduce the count for certain entries.

Doubly Linked List Queue of Events: You can maintain a list of these events in memory and can evict when new events come in. Now, this strategy may also suffer from memory blowing out in case there is too much interaction going on with a post. But if the poll and processing are done in the same thread we can have some kind of backpressure management here since each worker is consuming as much as it can manage.

What an event handler might look like

Now I want to emphasize a few things here,

  • Is that the queue implementation itself can be modified to hold some guarantees of reliability on crash recovery. For example, you can use something like Reddis List or WAL implementation to manage a list in memory or on a volume disk. Writing a recovery for this will not be difficult since we are just replicating the internal queue data only.
  • You may already have figured that queue size may get really big. Well, one solution for that is you can bucketize the incoming values if the accuracy of data can be sacrificed, which let’s be honest in this case will probably have to be made. What we can do is create buckets of 10 seconds and compact and compute the data which needs to be evicted (subtracted from the count). From the figure you can get an understanding all the bucketing allows us to use the amount of data we store by having as keep smaller aggregate information but we also lose a bit on the accuracy front.
Hope this helps !!

Now before we jump on the discussion of having a queue or no queue

Quickly let’s pin down some numbers

  • Average Request received per pod : 1.9M / min(total_partitions,total_pods) per second = EPS
  • Average memory pool size per second
    Let’s assume we keep the raw data of every event. (postId(64bit) + userId(64bit)+timestamp(64bit) ) * EPS = 24byte * EPS = 43 MB ps /(min(partition , pod))
  • The numbers seem to be justifiable for each pod. Although we have not really kept account of the storage data needed for each pod. Given we always track associative operations data, we can assume this will be close to the total posts being interacted. This should be 3M post in a single instant. If the data aggregated for each post is close to (1000 commenters, count (64bit + 64bit) + sum aggregate (64 bit) + top N=10 users(64bit) ) = 0.015 MB * 3M/(pod_count) = 44 Gb p/s / (pod count).
  • Let’s also estimate how much in-memory data for the queue we will have to maintain. If we intend to maintain a window of supposedly 5 minutes. So this itself will lead to a queue size of 5min * 60 seconds * 43 MB p/s = 12 Gb store for 5 minutes / (pod count)

The estimates should give us a good clue on how many pods of what kind of configuration we may need. For example, if the queue is maintained on the Redis queue then this 12 Gb would be completely present in an external memory, and our pod count will solely be determined by the window size.

Do consider these numbers to be rough estimations and correspond to raw byte numbers. Using flatbuffers or protobuf should help a lot in reducing the amount of data that is pushed out of the system.

Tradeoff Redis vs WAL vs In-memory

  • Redis: Recovery should not be an issue. Whether we go the route of stateful pod or stateless. Issues might present on eviction latency. Remember the eviction is done in a loop on receiving an event. This can involve read of the tail and then committing. Which can lead to a significant wait time based on network conditions for each event processing. An implementation that keeps the oldest tail and latest heads in memory and syncs the rest in reddis should help better read and write latency by batching the operations. But that’s just speculations.
  • WAL: I personally am in favour of this solution, since the disk is cheap and SSDs are fast enough. It becomes a question of Network vs Disk I/O latency. Where I feel Disk guarantees might just be better.
  • In-memory: Fastest but at the cost of memory. Now let’s assume we 1000 pods. The in-memory solution no more looks costly to me. The data structure for aggregation is also well divided on average. However, there might be skewing issues we may find for certain posts which can cause problems if we use the fixed post to partition mapping. The in-memory solution, however, is not resilient and data if lost once is lost. However, we will discuss a strategy later in the post to try and compensate for that as well.

Caveats of Design which may backfire: One issue I see as a caveat which may backfire is that we are maintaining the data aggregation inside the worker which means irrespective of whether we use a queue or not this is a point of failure and is prone to losing data. As each worker is concerned with a subset of posts we may have to spawn a lot of workers to distribute the workload. Most of these workers might also require a disproportionate amount of memory. A good way to address this might be to turn the workers into a Map-Reduce pattern to avoid the aggregation in one single container and maybe reduce the memory pressure we may create. Since the aggregation compute itself may not be the highlight as it’s streaming and most operations like sum, average and min follow associativity. Now although one can say that we reduced the load factor for the individual consumers we just introduced more points of failure. Although the good news is the failure of a few mappers may not take down all the data for a post. You can still have partial data working but with incorrect information but I feel it’s over-engineered at this point. Not that the entire post is not over-engineered right !!

Sample of ho workers can be further broken down

Component Breakdown: Qeueu:

Design Without Queue

Implementation without queue
  • If you want to simplify the system down to just click processing with no care of handling the backpressure of events you are welcome to do so. This means you cannot really provide a guarantee on how quickly your workers are going to process a message. This means any events being thrown at it can lead to large congestion of pending events inside the worker.
  • If you want to handle the storage and recovery of messages to build the internal data for aggregation for each worker then this model should work. Although this would also mean recovery would require some form of disk volume attached to your worker pods (Kubernetes terms). Which can then be used with something like this to implement a write-ahead log for all event messages and build up the state from the failure point when the pod fails.
  • You can also face another issue here, the resilience of the disk volume. If you need really reliable delivery which may not be a big concern for a system like this. But if it is then implementing it manually will be another issue.

Design with Queue (event broker)

Now let’s talk about the design which uses a Qeueu or an Event Broker in this case. Since all the discussion and suggestions were about if you do not use a queue till now. We may just have overdone a few parts. So let’s start with the mental model of a Kafka cluster and its relation with consumers. We have already established that Kafka is chosen because of its fast disk write and reliable recovery. Although I am open to suggestions and believe we can get any other queue to somewhat behave like this. Although I don’t see it worth the effort.

mental model to think about partitioning here. G = Group coordinator which is partition assignor

Since we have decided postId will be our key the partitioning logic will naturally be

partition_no = hash(postId) % count_partitions

We are at a cross road now. We need to decide how would we like to set up parallelism in our application. We have two option

  1. Use Kafka’s topic and group name-based subscription:
    *
    This is a usual setup. You have ephemeral consumers that can go down any time and are brought up any time. They don’t have any permanent states.
    * The postId they consume is also not exactly predictable per pod since every rebalance can lead to a different partition being assigned.
    * No issues in increasing or decreasing
    * In order to avoid losing already created data when the pod crashes, we can use the Redis list or WAL implementation as discussed above to stash the entire list structure.
    * We would require a crash recovery operator. An operator here is basically a pod that reads Kubernetes to know stats about the infra and take corrective measures. In this case, spawn a recovery pod with the failed pod id.
    * The recovered entries are now pushed to one of the pods and look like the usual stream information to the pod and are merged with the same logic as discussed above
    Advantages :
    * Familiar setup. No manual offset management.
    * Changing Partition count does not lead to any issues around stale data
    * No data skew of comment aggregation information.
    Cons :
    * Each pod may have its own data for the same postId , due to their stateless nature.
    * This means we are not able to reduce data here and will end up sending more data over the internet in the end.
    * We also need to have a separate operator that runs recovery jobs for us. 😫
  2. Assign Partition manually:
    *
    Pods are no more deployments and are stateful in nature.
    * Each pod have a unique assigned id and subscribes to a particular partition number.
    * Pods can now start committing the offset to Redis. Specifically offsets that correspond to the current tail entry of the list. In case of failure at any point whenever a pod needs to recover it can just get the committed offset from Redis and have the events replayed to build the original state again. Which if you now compare it to other solutions makes recovery a breeze
    * Since pods are tagged wth id manually and will receive only postId event from those posts means data will be aggregated properly and in the end the amount of data and requests in motion across the internet will be much lesser.
    Cons:
    * Partition increase and decrease has to be a very mindful operation. Increasing count would mean sending some of the postIds to a new stateful pod.
    * The older pod that held those postId events will now have stale data. Which will probably have to periodically checked and cleaned of by a single cleaner thread.
    * A big caveat here is that if pods crash we are looking at a complete loss of event streams for a set of postIds until the duration the pod is restarted and recovered.
    * Another potential concern is Spike due to Skewed commenting. For example, if a celebrity posts a feed. Then you will get a large skew to the pod receiving messages from that post.

Alternative Recovery mechanism: On subscription, every Consumer has information on what particular partitions it has been assigned. If we are using an operator pattern instead of having to read the WAL managed manually we can use the offset commit strategy which also commits the partition. In this way, the recovery job can subscribe to a partition with the specified offset and retransmit the messages to any of the pods for a full recovery of the state of the lost pod.

I am personally of the opinion is we can start from the second option and move to the first when we see that reliability guarantees are impacted That is if we say that user can tolerate a lack of feedback for a minute and our recovery time for pods is more than that in such case we may need to go with a solution which can utilise the recovery operator strategy. The recovery operator strategy can allow us to quickly republish the logs in some other machine to have it handle the recreation of the state and have it published to the concerned APIs.

Now Let’s also take a look at the Kafka configs as well.

Partitions: How many are too many? Well, you can sort of keep increasing the count as much as you want but it’s not a linear graph. You can read more here. Ideally, you would like to reduce the average consumption time. Keep increasing the partition until it is stable across a time range.

Brokers: Depends on broker VM size and number of partitions. Ideally, you should have the same number of partitions as a broker since your partition will then be distributed across all physical machines. Giving a nice distribution across the physical machine for the consumers.

Acknowledgement: Now you can take a call on reliability. So given these are click streams and I want some form of reliability but don’t really care about having committed them in all my replicas and can trade-off for SPEED !!! I will set the ack mode to 1, which means they are good to be consumed as soon as they are committed to the leader of the partitions. Replication is needed but assured replication is not so much.

Compaction & Retention: Compaction Yes and Yes. Ideally what we require here is for each post the data is retained for perhaps 24 hours. Although make sure to not add a small retention number here. Since Kafka uses active files to keep track of current event data and some stale segments for older ones which are purged based on criteria. If the max size specified for this file is small you will have tons of old segments. Which actually have active file handles with Kafka. That itself can cause issues within a broker.

Sending Data out to the world

We have not included this model here till now. But ideally, we would prefer a deployment of a server application which can expose an endpoint for us to push down all the aggregated data. This cluster of services can take care of sending these aggregated events to the concerned server out there. We are going to take a jab at this service when we finally start talking about the Subscription service.

The final architecture for the producer would look something like this based on our calls and decisions. We can use volumes or offset cache to use for recovery. You would need an operator if you are not doing stateful replicas and manual partition assignments. But either way, this is how things will look like. Remember there is still the ominous global subscription system that we have not discussed yet. Which soon will be included.

Phew, that was long !!!!

I hope you have cringed and criticized the post till now and have usable and valuable feedback which you can rant down in the comments or share with me personally 😃

And if not glad you stick till this point. Thanks for reading and hoping I was able to help you discover new ideas or learn something new.

Now let’s head to Part 2. Since we have not consumed the data yet.

--

--

Rajat Kanti Bhattacharjee
csmadeeasy

Your everyday programmer. Talking about everyday engineering. Love natural science, physics buff and definitely not good at no-scope 360.