Building for 50M concurrent socket connections | Powering the Social Feed
The Social and Gaming Team at Hotstar builds interactive experiences alongside video content. In IPL 2019, we launched the Social Feed that appears on the Hotstar Watch Page in our mobile apps.
<Feed Screenshot with Title: Feed showing chat messages, WatchNPlay quiz, Emojis>
As the content in feed relates to what’s currently happening in the live video, it needs to be delivered in real-time. Also, the content cannot be prefetched, as it depends on real-life events happening in the match. This also means that the timing is staggered as we don’t know how long exactly every ball will take. And all this needs to be done without draining all the battery from the clients.
With such requirements, it was clear that we would need a real-time message passing infrastructure (commonly known as Pubsub) for delivering the content of Social Feed.
And being part of Hotstar, the service needs to be massively scalable. This article will go through our journey of building our highly scalable Pubsub infra that can broadcast content to 50 Million simultaneously connected clients.
Picking the Protocol
Real-time requirement naturally suggested that we need an ability to push data to clients rather than the standard pull mechanism for HTTP requests. And to be able to push, clients need to be able to listen for events over a persistent connection.
There were already many protocols available that serve this purpose. We didn’t want to reinvent the wheel. These are other candidates which we considered:
- HTTP long polling
After building prototypes, and running benchmarks, we decided to go with MQTT. <To add evaluation matrix?>
MQTT or any pubsub implementation has a concept of topic aka channel. A message is always published to a specific topic and all the clients who have subscribed to that topic receive it.
We started out by looking at existing MQTT brokers available. We restricted to open source only so that we can see what’s happening under the hood, and optimise further. There were two primary candidates:
Both the brokers are easy to set up. Thus, we had functional MQTT brokers running within a day. The major evaluation criterion was performance. And in order to gauge performance, we needed a load testing infrastructure.
The vmq_mzbench tool created by VerneMQ folks was good enough for our initial low-scale tests.
Defining the first test
Let’s understand the types of functionalities that we would need out of the MQTT brokers.
Most of the features in the feed are of broadcast or fan-out nature i.e. same content goes to all the users. In pubsub, this implies, all clients would be subscribed to a constant topic. So, there’s a single producer and multiple consumers.
As the experience is interactive, any action taken by the user would have to be published to a topic and our backend services would be subscribing to the topic to process those actions. This nature of communication is fan-in. There are many producers but a single consumer.
Then there are some other features like messages from private contacts, where every user has different content. This would require a topic per user. And as there would only be one subscription to that topic, a published message is sent to a single user. But the number of publishes would be proportional to the users. Thus, there are as many producers as there are consumers. We refer to this part as unicast.
Among these three types of functionalities above, fan-out had the highest priority, then unicast and then fan-in. For fan-in, we could always expose an HTTP endpoint instead of publishing to MQTT. Hence, it was the least in priority.
Thus, our initial tests focused on fan-out, where one publisher publishes 1KB messages to a topic at the rate of 1 per second, and N subscribers for the same topic. <Add Rampup Graph / Share scenario>
We defined the following success criteria:
- Connections reaching exactly to SUBS + 1 (for publisher)
- Mean Connack latency is < 100k
- Pub to sub latency is almost horizontal, and not increasing
- Mean pub to sub latency value is ~500k ideally. Definitely should be less than 1M (MZBench latency computation sometimes has error and reports high latency, hence the high 1M upper bound here)
- Consumer graph should reach SUBS (Consumer = Number of Subscriptions)
- Messages Consumed = Messages Published * SUBS
Latency measurements are in microseconds. So 500k = 500ms, 1M = 1s
1. Single Node Tests
In these tests, we spawned single instance of VerneMQ and EMQX to find their performance limits. The goal was to find the maximum number of SUBS for which the above criteria pass.
We setup VerneMQ and EMQX on a c5.4xlarge machine. We set up the system tunings as per the documentation. For generating the load, we spawned 10 m4.xlarge workers and configured their IPs in MZBench using the static plugin.
With VerneMQ, at 50k SUBS, we had these observations:
- Connections reached 50k successfully
- Consumers (or Subscriptions) reached 50k, but it was late by ~20 seconds. Hence, when publishing started, not all clients received the message
The delay in consumers graph suggested that adding a subscription on a topic is not O(1). The growth rate of consumers reduced as the number of consumers increased. So, the cost of adding a subscription was proportional to the number of subscriptions.
After discussing with VerneMQ team, we tried some optimisations but the issue couldn’t be resolved quickly.
With EMQX (v2.3.11), we tested till 500K SUBS. Till 250K the latency was within our acceptable bounds (mean < 1M). It was amazing that even at 500K SUBS, all the messages got delivered and only latency increased.
A single instance of EMQX performed great. So, we decided to go ahead with it.
There was one caveat though. After the load test, EMQX used to get stuck. In this state, it accepted new connections & subscriptions, but it didn’t route the messages to these new subscriptions. In case there was a subscription done before the load test stopped, that client received those messages. This indicated that maybe there is some heavy computation happening during disconnections, which prevents routing for any new subscriptions. Thus, we had to restart EMQX before every load test. We eventually learned Erlang and fixed this bug. But the details of this are beyond the scope of this article. More info here.
2. Cluster Tests
After achieving 250k connections on a single machine, our next goal was to add more machines and achieve proportional connections, without any performance degradation.
In clustered mode, messages can be published to any of the nodes, and EMQX will perform the routing to ensure that it gets delivered to subscribers on other nodes.
Setup EMQX on 5 c5.4xlarge machines, and join them to form a cluster. Spawn 50 m4.xlarge MZBench worker nodes.
The latency increased linearly as we added more nodes. This makes sense, as hops needed to deliver the message increase with every node.
Separating Publishers and Subscribers
Say there are 5 nodes, and there are 250k subscribers on each of the nodes. Now, if a message is published to node 1, it needs to forward it to the 250k subscribers + 4 other nodes. Only after sending a message to the other node, can the other node forward it to its own subscribers. So, this internal node delay gets added to the latency of every subscriber on nodes 2–5.
In order to not have inter-node latency, we dedicated one node only for handling publishes. This way, it needs to forward the messages only to 4 other nodes, which it can do very fast.
By separating publisher and subscriber, we achieved constant latency irrespective of the number of nodes. Our load test passed for 2 Million SUBS 🚀.
However, this worked well only till 8 nodes. Beyond 8 nodes, <To add the issue here. Don’t remember>. EMQX internally uses Mnesia distributed database, which is part of the Erlang OTP framework. However, as this book suggests, Mnesia is designed for the low number of nodes (< 10). This could be a reason for this inefficiency.
3. Multi-cluster EMQX
Because cluster size has limits, we explored having multiple clusters. But, we need a way to distribute messages between clusters. This is where EMQX bridges came in. By creating a bridge, an emqx node will blindly forward all messages to the destination emqx node.
So, we deployed multiple EMQX clusters, where each cluster contained 1 publish node and 8 subscribe nodes. Additionally, we deployed a master publish node, which forwards all messages to the publish nodes from each cluster using bridges.
Aiming for 10 Million, we deployed 5 EMQX clusters with 8 c5.4xlarge subscribe nodes each. All the publish nodes and master publish nodes were also of type c5.4xlarge, although this can be optimised.
Earlier, when we had run 2M test distributed across 100 m4.xlarge workers, MZBench was almost at its limit. Beyond 100 workers, MZBench started crashing. So, our load generation infra needed some tuning. As the bottleneck was CPU, we got a quick boost to 60k per worker, by simply increasing the worker size to c5.2xlarge.
At 60k, we were reaching the TCP socket limit. We explored adding multiple private IPs to each worker and tweaking vmq_mzbench to use different source ip addresses when creating mqtt connections. By doing this, we were able to achieve 120k connections out of a c5.4xlarge box. So, overall, we had the ability to generate 12M load.
With MZBench all geared up, we ran our tests, by adding a cluster one by one. However, we found that latency increased <or was it something else?> as we added more clusters. This was surprising as clusters are supposed to be independent.
Erlang Hidden Nodes
After understanding how Erlang Distribution works, we found that our clusters weren’t really independent. Whenever an Erlang node connects to any other node, they share their previous connections with each other, forming a single group.
So, when the master publisher created a bridge to the child publisher, it also got connected to the 8 subscribe nodes, making it a group of 10. When it created a bridge to the second child publisher, the group increased to 10 + 9 = 19. Thus, every node was aware of all nodes from other clusters also. By making the master publisher node as
hidden , it didn’t broadcast it’s previously connected peers, keeping the clusters truly independent.
With this, we crossed the 10 Million connections mark 👏
Of course, there were still many open questions at this point:
- In our load tests, we were explicitly connecting to the ips of each broker. We would need a load balancer so that clients have a single endpoint to connect to
- How to scale the deployment based on traffic? Should we add more subscribe nodes OR add a cluster?
- How to make the infra resilient against crashes?
4. Load Balancers
As we run on AWS, we started out by using classic ELBs. Our initial tests failed dramatically. At even a low scale of ~500k, connections got dropped randomly, messages got dropped. <- Spill over count, surge queue> The performance was terrible.
We had to get ELBs prewarmed to a very high capacity. With prewarmed ELBs, we could reach 4M<To confirm?> connections on an ELB. Beyond 4M, the only way was to have multiple ELBs.
We also explored various other options:
- AWS NLB
- Software load balancing: HAProxy/Nginx
- Exposing EMQ nodes publicly and performing client-side load balancing
Through this exercise, we figured out that load-balancing persistent TCP connections is really hard! We even discovered some bugs in AWS NLB. Ultimately, we decided to go ahead with classic ELBs.
# 5. Kubernetes for resiliency
# Cost, and back to EC2
# Dynamic Scaling
# DNS Gotchas -> Client side load balancing