Complex Systems: Top N users writing comment indicator for post — Part 3

Rajat Kanti Bhattacharjee
csmadeeasy
Published in
16 min readJul 3, 2022

For anyone who has zero context about this series. Please visit
Part1 & Part2

For those who have glad you are here to see the end of it as well. So what’s on our plate today then?

Global Subscription System

As discussed last time and prior to that in both posts, we would take the Global Subscription System for granted. Not anymore. It’s time to decide some details of how we can build a good overall aggregation system and reduce the bytes flowing through the network by proper subscription put up in place. Before we begin let me give you some data to back up my over-engineered opinions.

Check this link. Original source. Numbers are in milliseconds.

Adding this because I don’t know when that link may die

Also, I won’t lie I have not really validated the numbers. Although numbers for Mumbai to Iowa are the kind of numbers I have faced during my time working with VMs for my own GPU workloads, So must be correct.

From here we can conclude one of the problems that the subscription system needs to solve is it needs to make sure Producer Infrastructure in Mumbai is not calling the entire world with 1.9M messages per second and holding on to all those requests. Yes, we are using TCP and we would like to have some reliability for our message deliveries. So that’s a lot of connections to hold while a request is in transit. A simple back-of-the-hand calculation can tell you that if the average latency of the request is 200ms then connections to hold during that time will be 1.9M * 200ms = 38000 connections. That’s a huge number to hold. We will need a lot of RAM, spread across a large cluster of machines.

Problem to Address

  • A large number of requests to handle subscription & unsubscription data
  • Resiliency. This is the single source of truth for all services. This cannot go down or at least be easily recoverable
  • Resiliency in outbound messages from Global Subscriber. If sending the subscription intent fails retry a few times based on SLA guarantees and then inform the client frontend application
  • Data redundancy is fine as long as we guarantee accurate data being received. That is if 10 regions have publishers we must view data from 10 publishers irrespective of delayed publication.
  • Resiliency in the central store. If for some reason a region goes down. The subscription system should be able to detect and correct its own data and also be ready for providing recovery information if needed.
  • Reduce the outbound total region-to-region traffic flowing for the subscription itself. No point in holding 1M post view subscription intent messages request at each frontend service.

Let’s take a look at the high-level plan and start drilling into each detail

Architecture

High level overview

Let’s quickly go through all the zones. Yes, I know you are probably squinting your eyes to see the image in full detail. We have gone through some sections previously and will be going through the new one here.

Red Section: Multiple instances, as many as there are regions. These are the consumers for the clickstream and producers of aggregate data for each region.
Blue Section: Multiple instances, as many as there are regions. There are the consumers for all the aggregate streams across the world, which means this may be taking in the event stream from all the 50 regions at once. Despite that our estimates have always focused on the worse case and the previous article has pinned down the numbers for the same.
Green Section: This is where global communication is established. This is the central infra and service that keeps tabs on which region requires data for which post. Its primary job is to keep tabs on subscriptions both region and global wise. Which avoids sending needless requests hence making the process of subscription itself pretty fast (not holding on to requests). This also acts as a global backup for any region in case it does go down. (In case Mr Robot decides to take down Google instead of ECorp 👀)

Let’s start with each of the Sections related to new modifications we did and then finally move to the central repo. Before we begin let’s establish some realistic constraints. There is close to 500+ commodity data centre all across the world. By commodity, I mean the ones that could host servers and other user applications. Even then there is no benefit in having such a large fleet of data centres being used at once. If your regions are covered according to user density then you are good. So let’s assume we have 50 regions in total. I feel that is realistic. So by this we mean there are 50 deployments of both the producer and the consumer. However, the subscription service will probably have either an edge local deployment while communicating to a distant database or we will have a single deployment that is not edged locale to most of the region.

Producer

Producer Transmission front

There are some new additions here. Mainly

Transmission Server (T): They take messages from the workers all load-balanced across a large cluster and send them across the ocean to some other regions that are interested in that topic.

Publish Listener (PL): Pardon for the name, not that it isn’t a big problem in software. This service updates a local cache to keep tabs on which post ids should be received by which regions. It can then republish this information across the transmission application cluster. Allowing for easy filtering on receiving messages.

So what exactly does each of these Publish Listeners store. The data we are interested in is which region has listeners for which post. We can store a post to region id mapping but I feel it can lead to redundant storage. Assuming all the posts in our platform are viewed at the same time somewhat close to (10M + 10M * 50 Region)(64bit) = 389MB which is a small number if we look at the scale of storage. Alternatively, we can store (50 * 10M)(64bit) = 381MB by storing (RegionId , HASHSET(postIds)) but it increases the query cost, storing.

Till now we also have not accounted for the resiliency of the data that is aggregated. It was not discussed in the original post. So from now on, we can assume that all aggregated data is going to be posted to a different Qeueu following the structure

<partition_key = postId , data = aggregated_data>

But this queue can be helpful in one more way. Remember the strategy we used in the consumer in the previous post. We can have the Publisher Listener post all the subscribing regions for a post to this queue and the consumer can build its internal state for retransmission. The recovery mechanism can also be similar to as discussed before. This now ensures that the above memory requirement is shared across the cluster. But we have two new problems on our hands.

  • How many Consumers? We already are aware these data that will be communicated outside the region will take a significant time to complete a HTTP request i.e we may just end up holding 1000s of connections in a second. But with a Kafka cluster, I don’t see a point in having that many numbers of partitions. Keeping a replication factor higher than 1 enabled is only going to make things worse. So what should be done?
  • Ordered or to be not Ordered? As soon as the messages arrive at the consumer (T-service) we would like to transmit the messages. But do we stop the transmission until we received acknowledgement of previous messages or do we go ahead and compromise the ordered nature of messages?

Before we answer the idea of how many consumers? We need to address the problem of ordering. This is a popular and also a well-known problem in distributed systems. Kafka or any message queue with it’s guarantee of ordering generally keeps it by acknowledging commits and keeping an internal queue of messages to commit. This gives the client the awareness and control over whether to try and send more messages. The ordering is also subjective to a particular partition. Which in our case is maintained until we send the messages out. Well, one may say well just wait till you receive acknowledgement and send the next message. The problem is you don’t know if the message was really received even if there was no acknowledgement? Take a look into these 3 cases

The three cases of network unreliability

As you can see the message not being sent and acknowledgement will probably have the same kind of behaviour. So should we time out and yet send the message? or retry the same message again? We really can’t retry at this point because our aggregation is not idempotent. We never addressed idempotency anywhere and assumed messages will be delivered only once. Which might be a fair assumption in our localised regions. Don’t roll our eyes we have already discussed that ack 1 mode in the previous post given we are fine with failure and order to be only done in partition level. Failure to send clickstream events can be ignored, few typed words are fine to miss out on. Remember we are aggregating on engagement and not word count 🚶 …

But here if we end up sending the same data twice the globally aggregated data will show up more exaggerated than it is. Here is a point where we must compromise on our accuracy of global aggregation.

We ignore stale data for a region in the consumer of a region

What this means as we keep accumulating data per region we also keep the latest timestamp from each region available in our aggregation data structure. There would be top 50 entries per post. This will increase the overall storage requirement by 50 times per postId, which is acceptable on the pod level since aggregated data size is extremely small. Check the previous post for the same. Using this we can now throw away stale older data. This solution solves our problem of idempotency. Because now we can avoid aggregating the same timestamp data from the same region twice and even ignore older data.

So is ordering necessary? Not anymore !! So it’s open season, send data as soon as available and possible. Well, ideally we would still prefer to have an acknowledgement so a mechanism which waits on acknowledgement but with a timeout is favourable. We can use exponential backoff-based wait times so that we can quickly reach a stable acceptable wait time. Which if breached can be used to update the wait time again and send the message forward anyway.

Now to answer how many consumers? Don’t know. But let’s dig deep into the Transmission Service requirements to identify how can we architect it for better auto-scaling. 😃

  • Persistent Connection vs Transient Connection? i.e Websocket or good old HTTP: Why are we concerned about this. Well if you remember the original calculation of request delay you would know how much memory the HTTP request is going to take. So do we want to create a new connection per request or do we keep an alive connection? My argument against an alive connection is that we are going to pile up with holding a bunch of connections while not knowing whether to drop the connection at any instant or not. With HTTP that’s not the case. So in HTTP, your memory scale can dynamically go up or down based on total acknowledged requests. This also fits well with our load balancer because we don’t know which pod is gonna receive a message so I’d prefer HTTP here.
  • Region-based Deployment:
The insides of transmission service

This is one is simple. We have a bunch of consumers, equal to the total partitions we assign. A cache since we would like the consumers to store the last read offset for a partition. So that when rebalance occurs we can use the offsets. Finally a bunch of deployments with Horizontal Pod Autoscaling. Which is a Kubernetes way of saying we have deployments that will scale on-demand based on load. This means if there are a lot of pending requests holding memory then Kubernetes will detect the same and spawn new pods and they will be load balanced among the pods.

Remember though that the Consumers can or cannot be stateless based on how you want to store data in the cache. You can store it with a stateful id name or with a partition id as the key. Either way, it doesn’t require a complex recovery strategy. We can even poll in the data from the Publisher Service as well if needed.

Phew !! done with 1/3 …..

This is me right now. Don’t know about you.

Consumer

The consumer side of things is really not that complicated. You have two new services that need attention.

  • The Retransmission Service is the end point we expose to the world for a region. Every producer will be sending its messages here. This can be a load balancer endpoint receiving all requests backed by an auto-scaled deployment.
  • Edge Subscriber: Its responsibility is to hold request ids for subscriptions for new requests for a new postId and a map of postIds to the count of subscribers. Its work is simple. If it encounters a new postId it will let the central service know otherwise it will just respond with a success to the frontend service that tried to subscribe to it. In the case of a new subscription if the subscription fails then it will let the frontend server know of it. So that clients can be notified and errors be handled gracefully. It also helps in clearing stale connections
For lack of better name labelled as Global Subscriber

Let’s quickly estimate how much data is the Edge Subscriber going to hold. (hashmap(10M postId + 10M count) + hashmap(requestId + userId) * 3M) * 64bit = 198 MB . And these are the worst-case values. I feel this is acceptable. So let’s nail down our outgoing subscription flow and we can be done with the consumer.

  • Frontend Service receives a client subscription request for (postId , userId)
  • Frontend Service forwards a subscription request to Edge Subscription Service with (postId , requestId , userId), You can ditch the userId if you want to persist the data of userId in the frontend service. While this is happening client already has an active connection with Frontend Service.
  • Edge Subscriber will now either forward the request with (regionId , postId) if it’s a new postId or respond immediately to the frontend service with a success.
  • In case of a new request i.e new postId , edge subscribers might receive a failed to subscribe message from the global service which can again be communicated to the frontend service to have it clean up the stale client connection. The client can choose to retry or not 🙇
Hoping this helps it easy to visualize

You may wonder why the hops. Why not update this directly on the central subscription service? Well, user activity is noisy i.e users come in and go at a whim. Do you want such activity to have network requests to every producer or even to that one central service? Doing it this way makes sure we reduce network travel time and short circuit requests.

I hope you have not lost the context till now but I guess this is a good point to start summarising the system. The rest of the discussion will largely be just around the recovery and reliability of these subscription messages.

  • Consumer infrastructure will subscribe by posting postIds it is interested in.
  • Each region postIds will be captured and broadcast to all the other producer regions.
  • The producers will receive the request and will start broadcasting the aggregates to the interested regions.
  • Interested regions will consume data from multiple producer regions and then finally send the data to the clients

Why broadcast to all Regions?

Well, we can start publishing which region is producing data for which post to the central subscription service. There whenever we receive a new subscription request we can map out the producer region ids and only send the request to them. Yes, that can be done but I feel we are ignoring the potential problems here

  • How do you solve issues where a subscription request came by for post1 at T=0 but Region1 Producer only broadcasted its new receiver at T=1? Basically, how do we handle false negatives (missing producer region) due to time delays
  • How do you ensure that clean-up of the producer posted data, since a user can stop typing at any instant and start typing again what’s the ideal way to deal with it ? How do you avoid false positives of identifying (producer_region , postId) mapping?

You can probably see some problems here, which probably will originate from the unordered messages and the potential delays. Solving which would probably require more brainstorming around the subscription service with more infra only saves a few network calls and memory space.

Even an Over Engineering fanatic 😝 like me would draw the line here. But we can also do that. 😄

Hopefully you

I hope you are in this state because we are about to wrap this pretty quick now !!

Global Subscription Service

Let’s pin down the why of it. What you are looking at is a much-simplified version of the architecture of this service. In the real world. You will probably have 1 server deployment per 5 regions and in total 10 deployments all sharing the same DB. It will be reading from its own replica and writing to the master one. I would prefer something like MongoDB here since I don’t see any row write contention going on here or the need for a transaction. Based on our previous discussion we also have a clear understanding of the kind of data structure we will require. This is basically the Edge Subscription service but storing the data of (RegionId , SET(postIds)). Preference towards mongoDB has pivoted around that it’s primarily a DB and I would prefer disk persistence here. This will act more as a central source of truth to qualify which region is interested in listening to which post. You may ask why we need disk persistence because of recovery and keeping records for reliable delivery.

If you notice in the architecture there is a Queue named Dead Letter Qeueu with a (R) which stands for Recovery service. Its purpose is to retry all subscription messages that we failed to get acknowledgement for. Now based on how you want to create your SLA’s for the client you can either retry infinitely or send back a fail message for a requestId to the central server. This will make it send a fail subscription message to the region the requestId belongs to and we now go back to the cleanup workflow we discussed before.

The central service can also listen for heartbeats from each region to be aware in case any region is down for maintenance and avoid sending messages to it and have them queued in the dead letter queue immediately.

Overall the role of this central service is to keep the central record of subscriptions and do reliable delivery and retries of subscription intent.

And we are done here!

Ya, let’s hear it ….

I know the experience principal engineers probably have this expression now

See every system has issues and bottlenecks. We have discussed potentially every problem that I could see. However, there are a few aspects that I might have been blindsided by or did not consider important. So let’s start with some issues that I see and I believe in time can be addressed in some way. So that we are well aware of them

Bottlenecks

  • We are aggregating too much and not filtering a lot on the producer. : Which is correct. But I feel implementing a filter would not be much of an issue. Since we already have the subscription system nailed down if we wanted to we could have also pushed the postIds from the Publish Listener into the kafka queue for creating a data structure in each pod to use for filtering. My biggest issue with this is we might potentially lose some minutes of data. If no one is observing the post doesn’t mean no one is engaging with it. It might just be a new post.
  • Real-world post recommendations are more random, more chances of skew: Correct. That is something I have not thought through from the very beginning. Although every region will have the same infra and be horizontally scalable. I have not thought through the effects of skew that can be created by the random recommendation of posts to users. You know like you getting recommended random Twitter users' post which then drives you into this vortex of weird profiles. Each such behaviour means a region is probably seeing subscriptions to posts whose primary engagement might be coming from Mexico, whereas you are sitting in India. What happens when such cases are not uniform and extremely asymmetrical how can we distribute resources and utilise them better. I feel we can explore more in depth the behaviour of existing system in case of asymmetrical distributions.
  • Can simplify the subscription? Yes and NO!! It can be simple yes. Would I? No. I have already pointed out my reasoning. If you see any issues with it I would welcome it. Also as I pointed out we can probably get rid of the global subscription service and just be happy with the edge one. But that means I need to replicate the recovery process in more places. Which is doable. So I feel it’s debating “Potato” or “Potaato” 👀. Where one costs more. Open for debate I guess. 😆
me :)

So ya this is the end of it. Thank you to anyone who has stuck till now. Hope you have learnt something. Got some ideas perhaps for your next interview. 😄

Also for reference, this was the original inspiration for this post. It was for just simple realtime messaging but I wanted a little bit more.

--

--

Rajat Kanti Bhattacharjee
csmadeeasy

Your everyday programmer. Talking about everyday engineering. Love natural science, physics buff and definitely not good at no-scope 360.