Complex Systems: Top N users writing comment indicator for post — Part 2

Rajat Kanti Bhattacharjee
csmadeeasy
Published in
19 min readJun 27, 2022

Welcome back to another episode of me describing an over-engineered system because why not !!!! Last time we finished the stream aggregator i.e our producer. This time we are going to nail down the consumer. In case you missed the previous one here is the link: Part1

Ya, I know. You asked for it :) now stick to it.

Consumer end

Let’s start by nailing down a very high-level overview of the components we would like to have and then start drilling down into them one by one.

The consumer issues come from the idea of having to fan out the messages. How many? Well to be precise Total Active Users * Active Post. If you remember we gave a number of 5M users. Let’s have them all active and stare at 10 posts on their feed while writing a comment on one of them. So we have now potentially an expected number of 50 Million messages flowing across our consumer system. You may wonder wait isn’t it a larger value than our incoming stream. Yes, you are correct. But 10 unique user’s can still be subscribed to the same posts right, which means all 5M can be looking at the same 10 or maybe overlapping 10 posts at a time. Either way that’s a lot of data flowing. Although by the time we reach the end of the funnel which is close to the edge servers, which push the data this will, however, be reduced.

Why because you can batch and send the data of all posts over a single Server Side Event connection. A fancy way of saying we will keep a streaming HTTP connection open and push all events batched together onto that.

Also, note that the estimation above for 50 Million messages flowing at one point might be completely wrong since we are trying to understand how much data will be flowing. However, the actual count should be much lesser if we are able to batch most of the data properly at the right endpoints.

So how do you handle 5M users on a single machine? The answer is you don’t. If you do then.

we all don’t have the money for the 64 Gigs machine. :)

This means ain’t nobody got time for multiplexing 5 Million active connections on a single machine. You can try but why do you want to? 👀

The other end of the problem is an N: M connection problem. Where some pods need to listen to specific post events and then have it transmit it back to the interested User servers. Remember till now we have put all our efforts into reducing the data into a single compressed blob that we can pass across the ocean. It’s now a question of how the data is redistributed to users.

Before we begin let’s pin down the problems we would like to address by the consumer

Problems to Address

  • Aggregate Post related event data from across the world and reduce them in the local cluster. Expect a peak of 1.9 million events per second. Check the previous post for the same.
  • Send the reduced information for a window of T minutes for the client
  • There will be n number of regions we will be receiving data from and this needs to be mapped to all the subscribers for that post events.

Let’s also look into the structure of the message we can expect in the stream. So few important data that we aggregated from each region and would be interested in are

  • N userIds with the corresponding comment or word count.
  • Total post comment counts (rising engagement metric)
  • Timestamp?
  • Since engagement data is generated for each word typed we can be sure someone is engaging as long as there is some event.

So let’s start with a very high-level view of what can ideally address the problems

The final design for the fan-out service

Frontend: User handling service that acts as an edge server that can respond back to user requests to subscribe and listen to post streams. Its primary job is to fan out the post request for its own users.

Downtime Handler: Uses operator pattern to listen on deployed environment events to detect worker/subscriber pods going down to let know the frontend service that it should try and re-subscribe

Subscriber / Worker (S): The consumer for events. These pods take care of polling, aggregating and retransmitting the data to the concerned frontend services. They also listen for subscription requests using which builds the internal data structure used for identifying event recipients. The data is retransmitted through a webhook exposed by the Front-end services.

Events Receiver (R): This one is optional. Either events can be directly pushed down to the cluster or we can have a proxy that takes care of retransmission and message recovery in case the cluster decides to act up. Trust me my mere months at GO-JEK have taught me that machines can act up at times and you don’t know why of them.

Message Broker (MB): In previous posts, we ended up calling it a queue because we were treating the cluster as a Qeueu with replayability. Not that it’s any different but this time it’s acting as a service-to-service communication so it makes sense to call it a broker for once. It takes the event from the entire world about posts and as well as subscription messages albeit under different topics.

Probably you after looking at the solution

But before you begin, I can already see some eyes rolling on the floor. 👀 Why can we not just use one global storage for advertising intended post to frontend mapping and let the producer across the globe read of it and push the messages. Oh….. well !!!!

Drum roll, please …..

I have been waiting to use one of these for a while 😝

Let’s start with the least effort design and identify issues and gradually move forward

Architecture Investigation

So before we begin with architectural iterations let’s quickly establish something. Subscription Service or the Global Subscription Service is so far now assumed to be given. We can assume that it does take some kind of information about which postIds to add for a region and then make sure traffic is reaching the consumer.

Iteration Uno

Client subscribes and let frontend servers know of intent to listen

As we have already described we would like to use SSE from the frontend servers and let the Client know of new events. Client at a time can subscribe to multiple posts at the same time and will let know of it. One thing we must consider here is these frontend servers are fan-out servers. Which means they are going to hold a lot of connections. Possibly close to connection_per_server = 3M / (server_count) in each machine. Naturally, they will need plenty of RAM for the same.

Let’s nail down the approximate ram requirements.

  • Probable hash table size for user to post mapping for an approximate 10 posts per user, userId (64bit) * 3M (keys)+ 3M * 10 postIds(64bit) (values) = 252 MB which is acceptable. If you distribute clients across multiple pods
  • Let’s nail connection size. Now know this connection size has to do with a lot around bookkeeping (socket ids and locks for same) but the majority of it comes from its buffer size and transmission control-related window sizes. Which you can read more about here. Let’s take the memory requirement for one connection as 10MB which would mean the total ram across pods will be 29,296 GB. That is not a small value and we definitely would need 10000s of pods. Only then perhaps we may be able to bring each connection resource down to 1 GB
  • Remember however that these are peak load calculations. You know if Queen Elizabeth posts pictures of her new dog. 😆 (we often forget what these scale issues are really serving at the end of the day)
  • We have not talked about post-data aggregation yet. Although now the memory requirement for the individual posts might be a bit lesser than the producer front. But we still are looking at aggregates from at least 10 regions. If we assume each major land mass on this planet has one data centre. If not then at least 50 regions. So let’s assume all total 5M users are active. So your event stream might just upshot to a close number of 3M events per second distributed across all your frontend servers. But are they distributed well and equally ??
  • Aggregating information would require a windowing strategy. Doing windowing on individual machines where the same partition connection is replicated is not a wise choice. Why let’s nail down the aggregation memory requirements. If at any point any pod receives all the bulk of messages i.e 3M events ps and each message size is, given we are only looking at the top 10 commenters and their related aggregates the message size might be close to (10 commenters, count (64bit + 64bit) + sum aggregate (64 bit)) = 0.00016MB 👀. But when these messages are held in the queue with the same strategy as discussed in the previous post. The total storage requirement will be 3M events ps * T(5 minutes) * 0.00016 = 140GB . A big part of the problem here is this memory requirement will be heavily replicated across multiple frontend services, because they probably have the same postId subscribed. Since we are not aggregating in worker pods we are doomed to waste CPU and Memory in doing the same thing multiple times.

Now imagine you have 10000s of services making requests to the subscription service independently?. Every producer out there must know the addresses of all the Front-end services to push the data to every other region.

For example. If there are 10 producer regions and 10 consumer regions. The mapping of content by Subscription Service is no more done on 10:10:PostIds, it’s done on 10:10000:PostIds Unique addresses. Yes, you can put a single service up front acting as a proxy for a region but then we include another problem here.

You see services die all the time. We are not worried about clients losing out on data since what the client did not see did not happen but might have still happened 👀. But what the server never saw can never happen unless we put some method by which to keep all this data intact in some store. Aggregation is necessary otherwise you will just overwrite the data of some regions with the other for the same post. All in all not a great solution but we do have some back-of-the-hand calculated data on our scale factors. Which are good guide marks for us.

This is where the message broker starts to come into view.

Adding the Queue/Broker

So let’s start with adding a broker or Qeueu with replayability 👀 . Now as soon as we do that you may point ahh well why not let the frontend services subscribe to the respective partitions. We can have the partition key as the post id. Done deal !!

Well, this works. We now have recovery. We also have now a single point where all messages can be funnelled in. But now let’s look into the other aspect of this problem. Aggregation !!! We still have not solved that issue. But before we even jump to that let’s take a look at the problems that we may potentially have with the architecture. For memory and CPU requirements, u can throw money 💸 at but for architectural complexity you need to find a developer who is nuts enough to understand and maintain it. 👀

Just like the previous post we are choosing to use Kafka because of replayability and replication. First thing, you cannot simply have multiple readers for the same partition. You can but you will have to assign these to a different group. Groups are basically a logical collection of consumers where no two consumers can consume on the same partition. In this case, no two frontend services can consume the same post id. That will not work for us. This is fine but now you are just adding extra legwork for the broker. Which is again acceptable. This is a usual setup. You may want some form of Group coordination though. This means we start off with “group1” and if that one has already a subscriber for some partition it returns a group Id where it can be subscribed. So we need a Group Coordinator. However, the Group Coordinator somehow will also need to ensure that message offset information is the same across pod restarts. You can do it with Stateful pods. But the pods themselves really need not be stateful. 😕 You can also adopt the operator pattern here and just listen to the Frontends failures and when spawning up a new frontend provides the initial state of the one that died.

This is the Group Coordinator in this Architecture. Too much going on all the time

Yes, I would say let’s stop here and call it a day !! But I have two specific problems here.

  • The count of subscribers: Managing 1000s of offsets for the same shared topic because multiple frontend share interest in the same topic, feels like extra leg work. This is an added workload that Kafka has to do. Although a broker would be a usual VM machine being able to handle 10k+ connections without breaking a sweat when connections go down and up left and right. The rebalances are not going to be particularly great. Rebalances are stopping the world events and will cause an unintended spike. We also did not account for the storage space and architecture of the group coordinator itself. We can use the Reddis Set Data structure to store podIds and the postIds subscribed by the pods , but with that comes pod-aware cleanup i.e we need an operator perhaps the Group Coordinator to be the operator that cleans up the Group Assignment data on the failure of pods.
  • Coupling of consumer and dispatcher: This may be a bit of a stretch. But think about it if you have close to 10k+ frontend services. They are all subscribed to different topics. At any point, if any partition (post events) which are locale in any sub-region gains interest we may end up with a huge skew. Basically, frontend will have too many varied posts being subscribed and tons of messages are going to end up being processed on the same machine. This also leads to a lot of wasted time on the frontend machine as well since the frontend may get events for other posts which belong to the same partition. Now, this is subjective to this use case but if you do require any kind of truncation or frontend-related data addition this may not be the best place to achieve this. Since we already have plenty of network connections to deal with, we would prefer a high count of lower CPU time with high memory pods allocated which may not be appropriate for processing events. Here processing can mean truncating data or adding any other metadata on top and aggregation as well.
  • Reducing Events data: As mentioned above this solution does not respect the CPU or memory wastes that happens when we collectively look at the architecture. I do not like that. I feel this can be an issue in the long run. Especially under a high load situation, we will be burning a lot of money.

If these problems do not bother you we can perhaps stop at this point and go with the group coordinator/operator + risk of skew and wasted events being sent to a machine, all the while wasting memory and CPU solution😕

Aggregator Workers Solution

Close to Final Solution

So let’s pin down some behaviour here for all components. From here onward, you can assume most of these deployments are done under the same localised cluster. Like the India Google data centre hosting all of this under multiple VMs constituting a GKE Cluster which is hosting all of the different deployments.

  • We now have added a new cluster only dedicated to consuming topics. It consumes, reduces (creates a common aggregated from multiple different regions) and identifies where to send posts and then calls on to the webhook provided by that receiver.
  • We now have a Downtime Handler which is an operator. But it has a simple purpose of communicating pods being down and asking pods to resubscribe to an event stream from the consumer.

So let’s take a plunge into the worker internals

Worker Architecture

Very similar to the ones shown in the previous post. As above we have already calculated the required RAM requirements we can be assured that a large cluster perhaps 100+ pods would ensure that work is equally distributed and RAM requirements for individual workers should be at acceptable levels (1.4 GB each). Although it is possible that we may still face some skew at which point I feel the earlier Map-Reduce strategy discussed in the previous post should apt. But the architecture can now support this and that’s the important bit here.

The next piece of the puzzle is to create a subscription mechanism. Let’s pin down what we are really aiming for in something like this

  • Frontend services should be able to advertise their intended post to listen on along with its container id (domain inside the network can be identified by it)
  • The message should reach the worker which is concerned with that postId’s event and be able to call on to the webhook exposed by Frontend services for passing down the events.
  • If the worker dies it should be able to notify the Downtime Handler to ask the concerned frontend service to resubscribe.
  • If a frontend server dies, 500 or 400 errors from the webhook should allow a worker to trigger clean-up in Downtime Handler

Before we proceed, just to clarify we are going the webhook route because we don’t want any more active connections being present on the frontend server. Holding an active connection with workers serves no purpose. Since these are internal servers and will always have fixed reachable DNS entries and worker can be told where to pass the message post processing.

Let’s pick one problem at a time

  • Subscription Intent: There are two potential solutions for this. One is we use an external cache to advertise the intent of the subscription. Particularly we can use a set for each postId to hold the server names <postId , SET(F1, F2…)>. The workers can always poll in the set data and update it to local. The other approach is the more event-driven one where we send the data for subscription and let the message find its way to the worker. The worker then updates its internal subscription table. Clean-up in this one is much easier since all the frontend service does is send another message for cleanup for a post and done !! The poll-based approach would not have this flexibility
  • Recovery and Failure handling (Frontend): Failures on the frontend service end are easier to detect. Calls to webhook if fails for a certain amount of time on the workers, the worker can choose to let know of this to the Downtime Handler , which will basically do a cleanup. A resubscription is potentially already done since the new pod that restarted must have already made a subscription call.
Standard
  • Recovery and Failure handling (worker): Failure on the worker’s end means the downtime detector would be triggered. It can now fetch all the frontend server ids for the downed pod and let them know to resubscribe while cleaning entries in its own cache. Missing messages can be recovered when spawning the new worker pod. Just like in the previous post we can use a separate cache for storing offsets for partitions. This would make recovery fairly simple.
The new setup for the Worker Recovery + Resubscribe mechanism

The recovery assumes that we are using an event-driven approach. I am more convinced of the event-driven approach since it makes clean-up a lot easier and no polling on cache at any point is necessary. So let’s go with that.

Message for Subscription : <key = postId , data = frontendPodId >

This key strategy ensures that we are always reaching the correct worker for the subscription. Let’s estimate how big the internal table for subscriptions needs to be built for maintaining this subscription list information.

Let’s assume we have 1000 active frontend servers. Each server perhaps might have 1000 unique post subscription requirements that are made equally by all these servers. That means when distributed across multiple workers. On average the workers might just end up building a table of <postId , [F1, F2 , F3]>. We may be looking at 1000* 64 bit + 1000 * 1000 * 64 bit = 7MB of storage. Which optimistically may be really small. So it’s completely fine to keep these in the local state.

Guess we have all information covered for our workers.

Here’s to you for sticking till now

Downtime Handler

Now let’s discuss the new component in the scene. The Downtime Handler.
What are the things it will handle

  • /store : Adding entries for <WorkerId , FrontendId>
  • /cleanup : Clean up entry for FrontendId given a workerId.
  • Internally it would listen to the Kubernetes control plane events to detect pods misbehaving or can also listen for pods sending error signals to it. Acting as a command centre for killing the pods and letting all Fontend servers for it to resubscribe

Remember the storage requirement for this service is not much as we have already evaluated above. This begs the question of should we use a disk-based Database or memory cached Database (you can configure an extremely high flush time) or resort to a good old key-value cache like Redis. This decision should be taken based on the following consideration

  • Do you want transaction support i.e add multiple changes at once and have them behave atomically
  • Are you expecting high contention on writes to change the same row field? I don’t see it happening since there is decent isolation since each pod will interact with its own data only. The API calls are also serial. You cannot do clean up and store at the same instant-on the same row. 👀
  • Is durability a necessity? Well if you are expecting things to go down left and right why not this. But I don’t see the writes being really frequent and if it’s managed Redis service you are using, you really don’t even need to bother with this. Added to it the amount of storage required is peanuts. Should be close to 1000 (frontendIds) * 64 bit + 100 (worker podIds) * 64 bit = 8kb. 😆
  • Do you need Query Support? Actually, we might 👀

So in my view, any in-memory store solution should do the job. I would still prefer a disk backup. So Redis is what I will prefer since I am aware we can have configured to flush to disk. But if we do not intend to use the Reddis Set data structure and prefer a usual query language a MySQL database to store all this information with a high memory flush rate also does the job equally well. I am leaving it up to audience to make the call.

Till now we have not discussed the timing of events received. We took for granted that each network request will be as near-instant as possible. But an aggregator for events across the globe is not going to receive messages as near-instant as we are assuming.

Timing Problems

Time slice diagram

So for ease of visualization, I have shown a stream of events as time slices. Now if you observe closely we are sending the respective aggregates at T4. Remember this is just a snapshot. Such event dispatches are happening every next time slice instant or whenever the internal data structure updates. If we pay attention here the dispatch received for consumers is delayed for different regions. This raises the question of how can we then do real-time aggregation?

  • Well, one way is to increase the aggregation window. You are still aggregating for 5 mins’ worth of data. Just that you now account for the least and largest request time latencies as well. The strategy may not work in all cases and real-time data can be inaccurate most of the time.
  • This is also worth noting though that Ordering is no more guaranteed. Simply because of the network latency of each message, we may end up with events from t-1 from china being consumed after t-2 from India, since the consumer was located in India (low latency)
  • We also have not considered Clock skews. On the producer end, the machine was geolocated nearby to each other. Clock skew even if present would be to a minimum. But here not so much, skews can lead to early eviction of messages. Remember the queues eviction strategy, we never specified the time standard used, but even if we use UTC and if the consumer machine has UTC+5second skew then messages received at the correct time will be evicted 5 seconds earlier from the perspective of the source.

So the question is should we really worry about these issues then? I feel these are all standard known issues in distributed processing scenarios and although we can try and compensate for these, I don’t feel that they are worth the time. Since most aggregation, if real-time is accepted to be delayed and has some margin of error. Also if you consider the problem of locality, for a post created by Indians how many Chinese or European subscribers are we really expecting? This is a point where we should measure and set up tracing across the system to keep track of the End 2 End latencies and understand if the issues around request latency are really impacting user experience. Remember from the beginning it has been our goal to reduce and aggregate the information as much as possible to avoid transmitting a large amount of data across the globe. This is optimistically done, in hope that we do not add large transfer time to the already unpredictable latencies.

😄 I am open to suggestions and inputs on this front. It’s a subject around which I am yet to investigate the existing solutions.

Finalised Architecture

New addition of recovery cache

So we have gone through all the edge cases, memory requirements and especially the recovery mechanism. In most cases, every service can subscribe and unsubscribe on its own but the clean-up is the nasty business here. Do consider this you don’t need Kubernetes or Kubernetes Operators always. In our previous design for producers as well we tried and avoided the same. If you remember it was achieved through a rather simple model of stateful workers with local volumes attached and Write-Ahead Logs being used for maintenance and recovery of state, which is an equally plausible way of achieving it. We emphasized the Downtime Detector since it will play role in the unsubscription from the global Subscription Service as well in case of Frontend Service dies.

In the next and the final part of this series, we will nail down the architecture of the global Subscription and event delivery service. Which you may find oddly similar to the Consumer subscription strategy.

Hope you found something of value here and if not do let me know what I can improve on. If you did even then do let me know. This ain’t perfect nothing is !! It’s all just a mess of compromises you take on the road. 😄

Ciao or you can head to Part 3 now

--

--

Rajat Kanti Bhattacharjee
csmadeeasy

Your everyday programmer. Talking about everyday engineering. Love natural science, physics buff and definitely not good at no-scope 360.