Proof Engineering: The Message Bus

A latency-sensitive message bus for the cloud

Prerak Sanghvi
Proof Reading
11 min readJun 17, 2021

--

This is a technical deep-dive post describing the custom messaging middleware that powers Proof’s Algorithmic Trading Platform in the cloud. For a high-level overview of our system, please refer to this post.

Background

As described in the overview post, our system uses the “sequenced stream” architecture.

Here’s a quick description of how this works: Every input into the system is assigned a globally unique monotonic sequence number and timestamp by a central component known as a sequencer. This sequenced stream of events is disseminated to all nodes/applications in the system, which only operate on these sequenced inputs, and never on any other external inputs that have not been sequenced. Any outputs from the applications must also first be sequenced before they can be consumed by other applications or the external world. Since all nodes in the distributed system are presented with the exact same sequence of events, it is relatively straightforward for them to arrive at the same logical state after each event, without incurring any overhead or issues related to inter-node communication.

The Sequenced Stream conceptual diagram

We mention “disseminated to all nodes” casually, as if it was an easy feat. This is in fact a huge challenge, sometimes known as the “fanout problem” — the challenge is that the sequencer, which is performing the critical task of sequencing, cannot also be burdened by the task of knowing all its clients and actively transmitting sequenced messages to each of those clients.

Often, this problem is solved by using UDP multicast, where the producer produces the messages once and all interested consumers subscribe to a known multicast group to consume the messages. In this solution, the network switches do the “fanout” of data to multiple consumers, sometimes using a hierarchical set of switches. This works really well, though it is not without its pitfalls, given that UDP is unreliable and a good multicast setup is hard. We can’t use multicast in the cloud (no, the fake multicast overlay is not effective), so we solved this problem using a custom messaging layer, inspired by Aeron.

We didn’t use Aeron directly because even though it calls itself a message transport, it comes with a whole ecosystem of modules and services. We didn’t want to adopt all of the different framework pieces, as that would limit our architectural choices. Plus, we’re just not that familiar with Aeron and don’t know all of the gotchas. Instead, we decided to build our own message bus, taking inspiration from the best and the most relevant parts of Aeron.

Wait, you built your own message bus?

Yeah, yeah, you’re right — it is a little bit crazy to try to create your own messaging middleware in this day and age. I mean, talk about reinventing the wheel. But it really does make sense, when what you’re trying to do is not necessarily create a better wheel with more features, but rather a minimal specialized wheel that only does very specific things (and does them fast and reliably). We are not aware of any cloud-enabled middleware with a throughput of millions of messages per second at consistent sub-millisecond latency (except perhaps this, which we haven’t tested).

An artist’s rendering of The Message Bus, not to scale

The Requirements

Let’s recall the problem we are trying to solve: we have a stream of messages that we need to send from a producer to one or more remote consumers, in the fastest way possible.

  • We do not want to assume anything about an individual consumer: it may be processing data contemporaneously with the producer, or it may be slow and hours behind.
  • We do not want consumers to miss messages that were produced while they were down (reliable delivery).
  • We do not want the consumers to affect the producer — for example, if we have a slow consumer, we do not want it to somehow “push back” on the producer.
  • We do not want the number of consumers to affect the producer or the other consumers — for example, if we were serially transmitting messages to a large number of consumers, the latency for the “last” consumer would increase as the number of consumers goes up, and overall the throughput would decrease.

In addition, while we wanted to support consumers with varying performance characteristics, we wanted to have an optimized fast path for the common case: an up-to-date consumer that is able to keep up and is processing data in real-time as fast as it can receive it.

The solution we came up with is to create a disk-backed queue that is replicated between servers using threads pinned to dedicated CPU cores. Read on to see how this meets all of the above needs.

The Replicated Store

The Replicated Store is a general construct we use to facilitate communication between a producer and one or many consumers. The producer app writes its stream of messages to a store, which is implemented as a memory-mapped file. The use of a memory-mapped file allows the recent contents of the store file to remain in the machine’s virtual memory. One or more threads of a replication server run on the same host, having mapped this same store file. There is one replication server thread per consumer server, and it is busy-spinning on a dedicated core and checking in a tight loop if the last offset (length) of the store file has changed. Once a change is detected, the new records are read immediately and transmitted over TCP to a replication client, which writes them on the remote server to a memory-mapped file. This replicated copy of the store is then read by the consumer apps, again potentially in a tight spin on a dedicated core, though that’s not mandatory and depends on each consumer app’s individual needs.

The end result of this design is that if the consumer is reasonably current with the producer, the replication process happens entirely in memory, directly from the producer’s memory to the consumer’s memory, with no disk access involved. As soon as the producer writes a record, the size/offset of the producer store file is incremented in memory, and the new offset and records are available as an atomic update to the replication server thread(s) within a microsecond or two. The records can be transmitted over TCP to the consumer servers in ~50μs (kernel/network latency), and then to the consumer apps in a handful more microseconds. (At some point, the operating system will flush the mapped portions of the store files to disk and this can cause latency, but there are ways to tune when and how the OS does this).

The beauty of this very simple solution is that the producer is completely decoupled from a slow or a far-behind consumer. Even if the consumer apps were started in the middle of the day, the replication server would be able to service them by reading the store file from the first message (this would involve disk access, and that’s ok because performance is not the primary consideration for this use case). Another related effect is that the individual consumers are independent — the relative speed of one consumer does not affect that of another consumer. Also, the consumers are not being sent the messages in a serial fashion, so there is no “first” or “last” consumer that is advantaged or disadvantaged in terms of latency.

We’ll admit that the design is expensive in that each consumer server requires a dedicated core on the producer’s server (see below section titled “A Note on Hardware Efficiency”). If we needed this to scale to more consumer servers than we have cores, we would use a hierarchical distribution pattern. For now, we haven’t needed to do this.

The Replicated Store

The IODaemon

The system is composed together using a number of these replicated stores. The last piece of the puzzle is the IODaemon, which as the name implies, is responsible for performing I/O on a server. The idea is that if there are multiple apps running on a server, they can all get their sequenced stream through a single transmission from the sequencer — IODaemon acts as the replication client. The IODaemon is also responsible for transmitting application output messages (aka “unsequenced” messages) to the sequencer for sequencing, over UDP unicast.

The IODaemon

TCP vs UDP

We won’t go into any deep technical details of TCP vs UDP, but it is a topic worth touching upon. When would you select one over the other?

TCP is a connection-oriented protocol and the primary benefit is that all packets sent over a TCP connection will reliably make their way from the sender to the receiver and be delivered in the right order. In addition, TCP contains congestion control algorithms that push back on the sender if the consumer is unable to keep up with the data being sent. All of these features can be useful, but they do make the protocol a bit heavy in terms of processing. In addition, you have to deal with establishing and maintaining a connection, and recovering from a broken connection.

If you have a situation where you aren’t too bothered by the lossy nature of UDP, or where you have an easy way to recover from dropped packets, you may decide to forego these features in favor of using a lighter UDP protocol. UDP, like TCP, sits on top of IP, but simply “flings” packets (datagrams) at the network hoping the routers will know where to direct them to get them to the receiver. There is no connection to establish and there is no guarantee that packets will reach their destination. In practice, it works reasonably well, and the data loss is minimal. Another distinct benefit of using UDP for the receiver is that the OS can multiplex the streams coming from multiple senders, as opposed to having to manually combine data coming over multiple TCP connections (where fairness and starvation issues would need to be dealt with).

In our case, we use UDP unicast to transmit messages from the IODaemon to the Sequencer, and we do in fact care about message loss. This doesn’t bother us too much because we have a relatively easy way to detect packet drops. An IODaemon that is sending unsequenced messages to the sequencer is also reading the sequenced stream. The sequenced stream should eventually contain all of these messages being sent by the IODaemon. If a given set of messages are not seen on the sequenced stream within a predefined amount of time (e.g. 500ms), the IODaemon can assume the messages were lost, and can resend those messages. If the messages were in fact lost, the resend may succeed and things go back to normal; if the messages were not lost but simply delayed, the resend will cause duplicate messages to be read by the sequencer, which is smart enough to discard the duplicates. The end result is that we’ve added reliable delivery to a lossy UDP transmission.

For disseminating the sequenced stream though, we use TCP, because there is no natural way to recover missed UDP packets from the sequencer. As such, and as described above, consumers come with varying performance characteristics and TCP fanout via the replicated store handles all of those situations well.

Market Data on the Sequenced Stream

One way to create a trading system using the sequenced stream architecture is to truly sequence all of the inputs for the system on a single sequenced stream. However, for various reasons, this may get unwieldy especially when the ratio of different types of inputs is very large. For example, we may have 1 thousand order-related messages in the system, but 1 billion market data events. While it is perfectly reasonable to sequence them together onto a single sequenced stream, and capacity-wise, the system would be able to handle them just fine, there is just something icky about having to chew through a billion+ messages in order to process the 1000 order-related messages.

In order to streamline this usage, we sequence market data onto a separate sequenced stream from the rest of the messages. This helps our Order Management System (OMS) significantly since it does not need to deal with market data, and is able to work with a much lighter stream. However, this creates an issue for our Algo Engine, which does need both the order messages and the market data. For this reason, we actually create a composite stream from the two component streams and feed it into the algo engine. This composite stream is recorded to disk and also available to be streamed away to a remote server if necessary. The resulting image looks something like below.

The Algo Engine consumes a merged stream containing orders as well as market data

Binary Encoding (Our Choice: SBE)

A word about the content of these stream messages. From our prior experience, we knew we wanted to use a binary encoding for our internal communication, and preferably, one that has fields at a fixed position (for ease of random access). After looking at a few options, we landed on Simple Binary Encoding (SBE), which is in fact a FIX standard.

SBE is blazing fast, supports direct field access, supports versioning, and provides a message processor that can generate Java encoder/decoder classes. We didn’t really do a full bake-off, and given that SBE is a FIX standard, we were kinda drawn towards it. After using it for a while, I’ll say that the performance is stellar, but I’m underwhelmed by the feature set, and in particular, support for an evolving schema. We’ve already extended SBE to do what we need it to do, but if I had to do this over, I’d look at Cap’n Proto or Flatbuffers, which we may yet do in the future.

A Note on Hardware Efficiency

Some of you “normal” engineers may be bewildered at the use of dedicated cores all over the place. First off, we never said this needed to be efficient in terms of hardware usage! Second, pinned threads on dedicated cores is a common, and often the only, way of achieving low latency.

These threads run in a tight loop and do not yield to the OS even when there is nothing to process, which is rather impolite in normal software architecture but totally accepted in the world of low latency! The OS has ways around this — for example, if the thread is making a system call every once in a while (e.g. recvmsg to receive a message from the network), the kernel will take the opportunity to intercept the thread during those calls, if necessary. To get around that, low-latency developers will use kernel-bypass to cut out the kernel from these processes completely (e.g. DPDK, VMA, OpenOnload). And the rabbit hole goes deeper with FPGAs, real-time kernels, and other such contraptions.

Closing Thoughts

We realize that the design described in this article is not the only way to achieve these outcomes and in fact, we’re not sure if anyone else does it this way at all. We are simply documenting what we landed on after our research, and this remains a work-in-progress.

As closing thoughts, I’ll repeat here what I’ve said elsewhere — if you think this is cool or that you can contribute to this work, please reach out at careers@prooftrading.com. If you are a technologist, you’re good at what you do, and want to help build a modern platform and have an impact, there is likely a role for you at Proof. To show our employees that we care and we appreciate, we make them true partners, with handsome equity grants, possibly larger than anything you’ve seen in your career.

If you have questions, reach out to me on Twitter: @preraksanghvi, or reach us at info@prooftrading.com.

--

--