Want a super-fast messaging system?
The ultimate guide to designing and building fast systems.
There are many technologies today that connect digital systems, services, and devices. HTTP and Messaging systems like NATS, gRPC or Kafka may come to mind. Speed and performance of these systems are critical for efficiency and scale. Speed and efficiency can also lead to predictable and deterministic behavior of a system under load, which I believe is even more desirable. Good performance is Good, predictable performance is Great!
So how would you go about architecting and building a system for Performance and Scalability?
Message systems are designed to pass messages across different network transports (e.g. TCP/IP, UDP, WebSockets, Bluetooth, LoRa). They can have a broker or server component, or clients can talk directly to one another. Most of my history has been involved with server-based systems, where clients talk to a server, and the server distributes the message to one or more recipients. Although some may prefer for systems to talk directly from one client to another, I feel the server/broker approach works better, given that the servers are easy to run and operate, cluster well so they can run anywhere, and are resilient and perform well. I will cover these subjects with reference to the NATS project, which I created to serve as the control plane for CloudFoundry, a project I created at VMWare.
Messages per IO call.
This is the first and most important to get right. It’s expensive moving between user and kernel space. If you call into the kernel for every message sent or received, this will kill your performance. I always measure how many messages per IO call I get for ingress and egress. For ingress, this is a bit easier since you can preallocate buffers and do what I call blind reads. Reading as much as the kernel has for you in one call. In general, this does not affect latency for inbound messages. One note of caution is to have these buffers dynamically grow and shrink in size to match the traffic, otherwise, it will waste memory when you are scaling to a large number of connections.
For egress, or sending the messages back out it becomes a balancing game of throughput vs latency. Getting messages back out of your message server is very important. I usually approach this problem by having a separate thread or coroutine actually doing the IO calls into the kernel. This allows userspace to coalesce multiple messages together, again trying to maximize the messages per IO call. I don’t want this internal buffering to go unchecked, and so the user space can reach a limit where it will take ownership of the IO call on purpose, to actually slow things down potentially upstream and avoid massive memory consumption due to internal buffering. If someone cannot keep up, fail fast and cut them off, or you will risk having one bad apple deteriorate the service for others. The signaling and switching costs to the IO thread or coroutine should be measured as well to achieve a good balance between throughput and latency. You should also measure how coalescing buffers vs using Scatter and Gather semantics (writev if supported) affect your performance.
There is quite a bit of interesting work being done also to eliminate the jump from user to kernel space with things like dpdk and other userspace only networks. What I am even more interested in is the ability to push processing down into the kernel via mechanisms like eBPF or even onto network elements like NIC cards.
In a messaging system that supports multiple message patterns like pub/sub or 1:N, request/reply, and load balanced dynamic queues, the routing of messages becomes very important from a performance perspective. In a system where subjects and/or topics are feature rich and involve multiple levels and wildcard support, mapping an inbound subject to a group of recipients can become non-trivial. Also if the change rate of the subject space is high, this will adversely affect the distribution performance without the right architecture. I have probably spent the most amount of time in my career on messaging on this one problem, and I am certain I still do not have it totally correct. In NATS, subjects are made up of tokens that are separated by ‘.’ or dot. For publishers, all subjects must be literal, however, subscriptions can have wildcards to group different publish subjects together into a single subscriber callback/processing flow. In NATS we have a token or partial wildcard that is ‘*’, and a terminal full wildcard “>”. E.g I could subscribe to “foo.bar.*” and that would be delivered for messages to “foo.bar.baz” or “foo.bar.22” but not “foo.bar”. If I subscribed to “foo.>” in NATS, then any publish to “foo.1”, “foo.bar.baz”, etc would work. For more information on what’s possible with wildcards, see our NATS.io documentation on Subject-Based Addressing.
The backing store for the subject distributor in NATS is a modified Patricia Tree or Radix tree. From Wikipedia, Donald R. Morrison first described what he called “Patricia trees” in 1968; the name comes from the acronym PATRICIA, which stands for “Practical Algorithm To Retrieve Information Coded In Alphanumeric”. This structure works well at conserving space when a subject hierarchy has many leaf nodes. Early work at TIBCO with Rendezvous (and EMS) and the financial space (think stock quote distribution) showed a general tree structure could be well suited. For instance, with a subject space like “foo.bar.baz.[1–1,000,000]” only the 1M leaf nodes would be distinct and would share common nodes for foo and bar. In modern computer architectures where predictable data access and cache friendliness are more important, tree structures with links are not as popular, but still extremely effective. The structure I used in NATS uses links and hashes for levels, so is a combination. You give it a subject and it returns a result list of normal subscribers and queue subscribers. For NATS they are handled differently so the result set separates them. Although effective, the performance of NATS would not be near as good as it is in this department if that was the only data structure we used. Many threads/coroutines could be accessing this structure. Using Read/Write locks helps, as well as atomics in some areas, and I do have a sort of L2 cache built into the data structure that fronts the Patricia Tree. A former colleague felt that we should redo the structure to be lock-free and use atomics and CAS. On paper, this always looks attractive, but rarely yields the results you would expect. Instead what I tried to achieve is similar to how processors and caches work, and in the best case, each ingress processor has a lock-free and un-contended data structure that processes most of the message distribution. Of course, cache invalidation is important here, both for the L2 shared cache and the L1 independent caches for the ingress processing. For L2 we try to be more intelligent and use smaller cache lines for invalidation. These are done under traditional locking schemes. For the independent L1 I do an atomic check of the generational ID for the subject distributor. If it has been changed, instead of trying to be too smart I simply blow away the whole L1 cache and let it repopulate from the shared L2 or the backing tree itself. For a single inbound stream a NATS server on modern hardware can process about 20M messages per second, not too bad. And that is just one stream! But again, this starts to get affected if the change rate of the distributor increases and causes more cache invalidations. If I get some free time I may go back to the drawing table on this one, but for now am reasonably happy with throughput for a modestly changing subject space.
This is my #3, but still quite important. NATS is a text-based protocol, and many would argue that only binary protocols can perform well. I have designed and implemented both. The text-based protocol does not suffer from performance in my opinion, but the protocol parser needs to be efficient. The original NATS server was written very quickly for CloudFoundry to use as the control plane for the system. I wrote the original version in Ruby, although today’s version is written in Go. While I still love C, I doubt I will use it much in future work. Go has been my language of choice for network infrastructure since its early beginnings.
Although NATS is fairly complex these days and I have no plans to rewrite the server in another language like I did when I moved it from Ruby to Go, Rust has caught my eye and I am watching it like many others. Bryan Cantrill’s post is a good read. What I am most excited about though is WebAssembly, but that is a whole other post.
So back to the protocol parser, the original one in Ruby was written using Regular Expressions. It was not because I was lazy, it was because RegEx in Ruby was very fast compared to walking the input stream using Ruby itself. Of course when we moved to a compiled language like Go, that performance advantage was no longer evident. The NATS server’s protocol parser is a near zero allocation hand-rolled parser that can deal with split buffers, large messages, text to integer conversions, and it performs very well. In the early days of Go the text conversion utilities were not super performant, so you will still see a lot of our own hand-rolled routines to move text to numbers and back, etc. To get a sense of some of the early performance challenges with Go in the very early days, feel free to watch a presentation I gave at the first GopherCon in Denver. It details many of the key performance areas presented in this post. Golang has done wonders in terms of performance around base data structures like maps and garbage collection over the years. It has an incredible team behind it. It was an excellent choice for us.
Last but not least is scalability, or performance at scale. It is important to understand if you have a performance problem or a scalability problem. You have a performance problem if your system is slow for a single user, or in this case a single client connection. Your system has a scalability problem if your system is fast for a single client connection, but slows down for lots of connections. We already saw a hint of this when we were discussing message routing, and the multiple cache layers in the subject distributor to avoid contention at scale. A NATS server on modern hardware with many high message rate connections can process almost 100M msgs/sec. If you think about separate message streams, where there is no crossover, we should scale as close to linear as we can until we hit the CPU/number of cores. In reality, this is difficult, as we begin to thrash caches and locking structures force synchronization. In our example above, sharing a single subject space and data structure to route messages would have severely compromised our scalability with even modest update rates to subscriptions. Areas I concentrate on to make sure they can run concurrently and in parallel if the hardware has the resources are as follows.
- Connection ingress processing. This is the protocol parser. They should be independent in terms of processing for each connection.
- Message routing. This has been discussed, but so much effort has been put into this piece of the architecture to make it perform and scale.
- Message Egress. This should also be able to run at scale, even when multiple ingress sources are coalescing into a single egress stream.
I did a talk back in 2011 on Scalable and Available, Patterns for Success. It’s worth a walkthrough if you have the time and a great addition to this post.
If you have made it this far, congrats! I have been at this for almost 30 years now, but I still get great pleasure in designing and building systems that are easy and FAST! I hope some of the discussion points above can help you design and build a fast and efficient network-based service of your own.
If you liked this, click the💚 below so other people will see this here on Medium.