PushMan: The Koinex standard for realtime experience

Notes on scaling realtime messaging to a million messages per second at Koinex

The article is the development story of PushMan, our in-house realtime Publish-Subscribe (PubSub) system that delivers messages published from the polyglot Koinex platform (an ecosystem of microservices written using Node.js, Ruby, Java and other programming languages) to the users on the Koinex Web and Mobile app. The article aims to discuss the architecture and technical challenges faced during development of Pushman. Solutions to these challenges are discussed in a manner to highlight ideas and optimization techniques that can be generally applied to develop highly scalable and concurrent systems.

1. Introduction

Real-time communication on Web and Mobile apps has become the de-facto standard of delivering information to the user. Gone are the days where users were happy to get responses over email or on the reload of the page. Instant Messaging was the first powerful use-case of realtime communication and without a doubt it paved the way for popular social media platforms like Facebook and Twitter to deliver content to users within a blink of the eye. More and more apps are now using the realtime web to solve complex problems like Collaborative editing, Multiplayer gaming, Live streaming updates etc. Problems which once were termed impossible to solve are now getting solved easily by the new type of user interactions powered by the realtime web.

At Koinex, we are dedicated to provide the best trading experience to our users. Considering the volatility of cryptocurrencies, delivering updated Ticker, Order Books, Open Orders, Settlement Details etc. becomes crucial to ensure seamless trading for users in a high-paced market where milliseconds matter. Depending upon the upward or downward movements of markets traders book positions by taking create, modify or delete actions on their orders. So if there is a lag in providing the realtime market data points, the user might miss an opportunity to make some profit or worse, end up in a loss. So, we have to ensure that we deliver the utmost updated data with the minimum possible latency.

Achieving this sort of realtime delivery becomes a challenge with the scale we have! Where is the fun in easy challenges anyways? Let’s first talk about scale. We have a very large user base and have seen more trade volumes than Indian e-commerce or online ticketing giants. To put that in perspective, we have delivered more than 20 Billion realtime messages to our users in a single day. Keeping the lean startup approach in mind, we deferred development of in-house services and relied on third party cloud services for off-loading such functionality so that we can focus on the development of the core trading platform.

By using third party services we traded the development time with a lot of bucks. Relying on third party services was a temporary solution and we soon realized that this integration model will be unsustainable considering our long term goals and continuously increasing scale. So, we started development of an in-house realtime Publish-Subscribe (PubSub) system and we named it PushMan.

2.1. A PubSub system under the hood

There are basically two types of messaging patterns used in microservice ecosystems. First is Push model, in which one service synchronously calls another service and waits for the response. Another is, Pull model in which each service is connected to a central bus (message queue in general) and inter-service communication is avoided to an extent. Pull model scales more when compared to Push. A PubSub system serves as the backbone of Pull based systems, Apache Kafka is one such example. A PubSub system exposes two interfaces:

i) Publish(Pub): Publish lets producers publish messages on a particular channel. Channels (also topics) is the segmentation layer through which subscribers can listen to only the messages they are interested in.

ii) Subscribe(Sub): Subscribe lets subscribers listen to messages according to their interests. They do this by only subscribing to channels they are interested in.

A PubSub system where the subscriber is subscribed over two channels

2.2. Pushman vs traditional PubSub systems?

Most of the PubSub systems maintain a queue which is used for message buffering and persistence in case there are no active subscribers. The subscribers execute different workloads depending upon different messages they receive on channels over the course of time. Pushman is not designed for such type of messages, it’s only optimized for delivering the realtime value of an attribute with minimum latency to a large pool of online subscribers. For eg. take the price of Bitcoin, there is no point in returning 5 mins old price to a subscriber that dropped off the website. Pushman is developed keeping the Single Responsibility Principle (SRP) in mind.

2.3. Pushman high level architecture

There are overall two components of the Pushman system:

a) pushman-pub

b) pushman-sub

Pushman high level architecture

Publishers send the messages to the pushman-pub component over HTTP which consequently pushes it to a low latency store. We use Redis as a transient store for published messages. pushman-sub component retrieves the messages from Redis over a TCP connection, over which multiple Redis PubSub subscriptions are multiplexed and subsequently sends them to subscribers over Websocket protocol.

The message format somewhat looks like this:

{
"channel" : "ticker_INR",
"data" : { "BTC_INR": “500000”, "ETH_INR": "35000" },
"event" : "ticket_update"
}

The data field contains the message to be delivered. The channel field contains the channel on which the message is sent and event is a meta field.

Let’s see a basic sequence in which a message is delivered.

2.3.1

A subscriber establishes a Websocket connection to the pushman-sub server.

2.3.2

Subscribers sends subscribe command to subscribe for messages from a particular channel.

{
"method" : "subscribe",
"channel" : "ticker_INR"
}

Here subscriber is interested in receiving messages from ticker_INR channel. After that, the subscriber starts receiving messages published on the channel ticker_INR. A subscriber can connect to other such channels also, eg. my_open_orders, my_notifications etc.

{
"channel" : "ticker_INR",
"data" : { "BTC_INR": “500000”, "ETH_INR": "35000" },
"event" : "ticket_update"
}
{
"channel" : "ticker_INR",
"data" : { "BTC_INR": “510000”, "ETH_INR": "35100" },
"event" : "ticket_update"
}
{
"channel" : "ticker_INR",
"data" : { "BTC_INR": “520000”, "ETH_INR": "35250" },
"event" : "ticket_update"
}

2.3.3

Subscribers unsubscribe from ticker_INR to stop receiving messages.

{
"method" : "unsubscribe",
"channel" : "ticker_INR"
}

3.1 Design Constraints

3.1.1 High Throughput

Each second, hundred thousands of users will open the Koinex website or mobile app. This will lead to high number of incoming connections and subscriptions (subscribe commands) to the pushman-sub server. Contrarily, the similar number of users will be dropping off, which lead to a high number of unsubscriptions (unsubscribe commands). A subscription means state allocation inside pushman-sub and an unsubscription means state deallocation.

Talking about messages, let’s continue with the ticker_INR channel. If 10 orders are matched per second in the BITCOIN/INR market while 100K users are online, we have to send the updated price of BITCOIN 1M (10*100K) times with one user getting the updated price 10 times in a second provided there is change in price.

So, the implementation needed to maximise the following:

Subscriptions/Second
Unsubscriptions/Second
Messages/Second

3.1.2 Low Latency in message delivery

We cannot incur delay in delivery of messages to subscribers. There is no point in delivering already changed BITCOIN price to the user 5 seconds late. No matter how many times the price changes in a single second, we have to deliver with bare minimum latency. Latency depends on a number of factors:

Latency ∝ Number of Active Connections
Latency ∝ Message Frequency

Message Frequency of a channel is defined as the number of distinct messages to be sent on the channel.

3.1.3 Fault Tolerance and availability

Pushman is a core service in Koinex ecosystem and is used by multiple Koinex services. So, we cannot afford slightest of downtime in Pushman and it’s needed to be available all the time. By design. it should be able to handle crash recovery by restart.

3.2 Design Principles

3.2.1 Concurrency and Parallelism

To achieve high throughput we decided to write our code in a concurrent way that can be parallelised using multiple CPU cores. If you are not familiar with Concurrency and Parallelism, you can listen to this insightful talk by Rob Pike, co-designer and developer of the Go programming language

3.2.2 Choosing the right programming language

At a given time, the number of online users will be contributing to the size of the connection pool of pushman-sub server, which will be large at any point of time. Given a single connection from this connection pool, Pushman has to:

a) Listen for commands over the connection (subscribe, unsubscribe etc.)

b) Check on the heartbeat of the connection

c) Send messages over the connection

Each connection will require 2 threads in total, one for reading and one for writing on the connection. Comparison of the initial thread stack sizes for different programming languages:

Programming languages min thread stack size comparison

Python and Node were not feasible options as they don’t support parallelism over multiple CPU cores out of the box. Suppose, we have to support 1M online users and we go with Java/C++, we end up using 2000–8000 GB memory if each connection uses 2 threads without counting for extra data structures allocations in the thread. So, Golang comes out to be the real winner here. With the help of goroutines, Golang can create 1M concurrent execution units with just 4–8 GB of memory. And to no surprise we are running, 1.5M goroutines on an AWS m4.2xlarge production instance. Writing concurrent code in Golang is real fun due to its simple design and it made sense to save development time also. And yes, Golang automatically runs goroutines over all available CPU cores unlike Python and Node.js.

3.2.3 The C1M problem

Lets take the case of reading commands from a websocket connection. The receive function below reads from wsSub.wsConn (websocket subscription connection) object.

Let’s assume we have 1M open websocket connections and for each such connection we have a reader goroutine with wsSub.receive()called. The consumed RAM is 2–4 GB provided we don’t have any further allocations done. The real question here is, would you always be receiving commands from the subscribers? Not really, most of the time you will be sending messages to a subscriber once it has subscribed to a channel. On a server with N cores, only N goroutines can execute in parallel at once. Our 1M goroutines are waiting for a packet to arrive on the connection, so that they can be invoked and executed. The goroutine invocation is handled by Golang internally, but given such large number of connections to monitor it’s not optimal to delegate the responsibility to the Golang runtime. This is known as C1M problem, or Connections 1 Million problem.

How do you effectively find out which file descriptor (i.e connection) is available to read from a pool of 1M connections ?

The trivial way would be to run a for loop and monitor them individually. Obviously, it does not scale at all. Can we do better? Yes.

Enter EPOLL. EPOLL is a kernel notification mechanism through which we can ask the Linux kernel to monitor a large number of open file descriptors and get notified in constant time about the ones which are available to read. Once we get the readable file descriptors we can start our goroutines. This way, we can save the memory used up by idle reader goroutines and also speed up the code execution by calling receive() only on the subset of the complete file descriptor set. We made use of the excellent netpoll library to achieve this. Here’s is a example demonstrating the use of netpoll,

netpoll example

3.2.4 Going In-Memory

Pushman is required to be a highly concurrent system to fit within the design constraints. It was clear that we cannot use any disk persisted database for state management due to requirement of high number of allocations/deallocations per second. So, we decided to take a full in-memory approach and stored everything in RAM. It was also supported by the realtime nature of messages Pushman delivers. For eg., if the message containing the price of BITCOIN at 1.3th second gets lost due to a server crash, Pushman can always deliver the message containing BITCOIN price at 1.8th second. The user will always see the updated price. In-Memory systems come with their own problems, one has to be very diligent in their implementation to avoid segmentation faults. Considering the concurrent nature of Pushman, the possibility of incorrect memory access increases many folds. We will discuss later how we implemented some locking techniques to solve the memory access issues.

3.2.5 Optimising operations using BST

Consider the case of public channels, for eg. ticker_INR to which all subscribers subscribe for getting the live prices of all cryptocurrencies in the INR market. The following operations were required to be considered while choosing the data structure for storing the subscribers information on a public channel:

a) Add a new subscriber

b) Remove an old subscriber

c) Send a message to all the subscribers

Let’s analyse the complexity of above operations when we use a LinkedList for storing subscribers on a channel:

a) Add a new subscriber - O(1)
b) Remove an old subscriber - O(n)
c) Send a message to all the subscribers - O(n)

Using a BST(Binary Search Tree) reduced the complexities to an extent:

a) Add a new subscriber - O(log(n))
b) Remove an old subscriber - O(log(n))
c) Send a message to all the subscribers - O(n)

So, we went for a map with channel names as keys and BST as the values for managing channels vs subscribers state.

3.2.6 Sharding data structures to decrease contention in locking

Data protection is the most important concern while dealing with in-memory systems. Golang has a beautiful API that allows you to define your mutexes along with your struct definition:

// BST structure
type BST struct {
root *Node
    size         int
    sync.RWMutex // embedded mutex
}
// channel vs subscribers state map as discussed earlier
channelSubscriberStateMap map[string]interface{}

Now, consider two concurrent operations running in parallel:

a) We are traversing the ticker_INR channel BST to find all the subscribers to send a message

b) We are adding a new subscriber to ticker_INR channel

While we are carrying operation b we need to have a write lock on the BST structure. The problem with map is that whenever you update the value at a key, you need to acquire a global write lock that prevents concurrent modification to another key in the map also. Pushman being a high throughput system, the write locks were getting hit very frequently and message delivery latency was increasing. We then decided to shard Pushman’s internal data structures to decrease the lock contention. We developed safemap package to shard on the map key itself and shardedMap package to shard on an externally provided key. Here is the basic implementation of a safemap:

safemap.go
By using this approach, the lock contention can easily be decreased by increasing the SHARD_COUNT.

Let’s see a comparison between non-sharding vs sharding approaches when a new message arrives on ticker_INR channel and is required to be sent to all its subscribers.

Sending message over a public channel

With sharding, the code can be written in a more concurrent way and runs over all CPU cores. Hence, the latency of message sending operation was reduced dramatically.

3.2.7 Avoiding aggressive locking

The ideal locking strategy is to avoid using any locks at all! But, often locks are unavoidable. The best practice is to try using only read locks (RLock in Golang) on a data structure when you are not modifying anything and use write locks (WLock in Golang) when you are actually modifying something. As mentioned, we used Redis as a transient message store. Since, there is a hard cap of 65K on the number of TCP connections to Redis, we had to multiplex multiple Redis PubSub subscriptions over one TCP connection because the number of channels can be way more than 65K. We used a hash distribution function for achieving this multiplexing.

Consider the case when many users sent subscription request to ticker_INR channel at the same time and no underlying TCP connection was existing for the ticker_INR channel. Each goroutine will need to acquire a write lock, so that only one TCP connection to Redis gets created. Let’s see comparison of two locking strategies here:

Comparison of locking strategies

Instead of always using a write lock we can go for a strategy called Double checked Lock ( Read-Lock-Read ) to decrease the lock contention to large extent and improve on performance.

3.2.8 Misc optimisations

Using the above mentioned points we increased our throughput to a good number and significantly reduced the latency. There are some other important optimisations that I would like to highlight here, without discussing in much detail about them.

a) Resource Pooling : Pool system resources such as TCP connections and goroutines because initialisation requires extra overhead, consuming both CPU and memory and thus increasing the latency. Apart from resource pooling, use resource limiting options like limiting connection pool size and goroutines pool size to avoid starvation and process crash.

b) Allocate wisely : Goroutines are cheap but if you aggressively allocate new structures inside a goroutine, the memory consumed will increase in proportion to the number of active goroutines, which will always be high in a concurrent system. So use strategies such as singleton instances and pass by pointers.

c) Use I/O buffers : Disk and Network I/Os are costly, always try to use read/write buffers wherever possible. Read more on this here.

d) Horizontal Scaling : An application deployed over a single server will always have limited CPU and memory to use. For eg. ~1M instructions can be executed per second on a standard CPU core. This implies that if you have a server instance of 8 cores, you can at max send 8M messages/sec. With horizontal scaling, you can deploy your application as a distributed application comprising of N server instances increasing your message throughput by N times. For eg. in the current deployment, Pushman has a message throughput of 32M with 4 production servers. A distributed system comes with its own perks of providing fault tolerance and availability by providing an option of using multi active zones deployment topology.

4. Conclusion

Building Pushman was fun, we learned a lot of optimisation techniques on the way. Recently, we integrated Pushman into Loop as well, checkout Loop if you haven’t yet. Loop is a peer-to-peer trading platform to transact in digital assets using fiat currency. By building Pushman we were able to reduce third party integrations from our platform and managed to cut our costs by 20x!

Thanks for the read. Please share your experiences with similar problems in the comments and feel free to reach out to me at Twitter or write to me at ankush@koinex.in, in case you have any interesting problem to discuss.

5. References

i) http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html
ii) https://www.youtube.com/watch?v=cN_DpYBzKso

Don’t forget to show some love by hitting the claps and subscribe for such interesting articles from the Koinex Engineering Team in future. If you are interested in working with us, you can introduce yourself at careers@koinex.in.

You will get notified when we publish