Meet nanoQ — high-performance brokerless Pub/Sub for streaming real-time data with Golang

Iurii Krasnoshchok
Aigent
Published in
7 min readJan 7, 2021
Image credit: https://github.com/egonelbre/gophers

nanoQ (nQ for short) was made for streaming real-time data: audio, video, clicks, sensors — where it’s okay to lose data to get the most up to date messages quicker.

The main goal is instant “fire and forget” publish with a best-effort delivery guarantee and support for many (hundreds, thousands) simultaneous streams.

If you’re “show me the code” kind of programmer, without further due github.com/aigent/nq

Rationale

We firmly believe that good software products were made to solve one specific problem (and get bloated and abused for non-intended purposes only afterward).

At Aigent we use nQ to transfer VoIP audio and signaling streams through our backend infrastructure. Existing message brokers don’t cover this scenario too well. If a service wants to transfer data to another service, a scheme:

Pub → broker → Sub

creates additional roundtrip with increased latency and hardware load.

In some cases, a central broker may become a bottleneck, and especially in mixed workload: fast and slow clients, big and small messages, attempts to scale a broker may result in pain in the neck.

Instead, using nQ publishers connects directly to subscribers.

    /→ Sub1Pub  → Sub2    \→ Sub3

This approach is very simplistic but powerful: you can build scalable pipelines of arbitrary complexity. Combine this with Kubernetes or cloud service mesh routing and auto-scaler, and you’re golden.

In the scenario above, service Pub publishes to Sub1, Sub2, and Sub3 simultaneously, imagine services Sub1 and Sub2 are fast but Sub3 is being slow or even inaccessible. What options do we have?

  1. Slow down processing for all subscribers and create backpressure.
  2. Buffer incoming messages until Sub3 will be able to keep up.
  3. Deliver messages to Sub1 and Sub2 at their pace, and drop messages directed to Sub3 — this is what nanoQ does.

This is an easy implementation of graceful degradation — overall pipeline continues to run, despite some services are underperforming or malfunctioning, all with very low maintenance cost (you don’t need to monitor growing queues or chase the slowest component).

It works in our case — we want that received data to be fresh anyway. At Aigent we send hints to call center agents in real-time during a conversation, and to receive a hint about something you spoke about a few minutes ago is not helpful — it’s irritating at best.

How nanoQ works

I wish every software project has one-two paragraph description of its internal kitchen and design decision instead of thousands of pages of marketing material. So here we go.

There are some spectacular failures, trials, and errors behind nanoQ that eventually made it.

In a Nutshell

nanoQ high-level design

TCP vs. UDP

You may think that UDP is low latency and “fire and forget” by design, so it has to be an obvious choice. There are two reasons against it: packet loss and congestions.

UDP drops packet on too many occasions: send rate is too high, CPU is busy with another process, a kernel queue is full. We want to drop packets as a last resort when a receiver is inaccessible but we don’t want to deal with cracks, hisses, and missing pieces of audio because a CPU on of the machines was busy.

Congestions are even harder to tackle. Imagine you have a 100Mbit/s network, and you send at the 80Mbit/s rate because, hey, hardware must work for the money. All of a sudden, bandwidth drops to 15Mbit/s. The first question is: how do we know that it dropped? The second question is: what to do? We can not continue to send at the previous rate of 80Mbit/s. In the worst-case case scenario, we’ll have congestive collapse, when the link is available but due to exceeding incoming traffic, no packets can reach the destination before timeout. The worst of all is that bandwidth may return to the previous number of 100Mbit/s but due to the existing congestion destinations may seem unreachable, or data may arrive too broken to be meaningful.

The network protocols built on top of UDP usually have complementary protocols or messages for bandwidth control, jitter avoidance, QoS control, retries on packet loss. For instance, RTP — Real-time Transport Protocol has RTCP — RTP Control Protocol.

These protocols are quite complex. But then — TCP does all of this, and often very efficient on the hardware level.

In our case, we can tolerate hundreds of milliseconds delay, and we want to transfer thousands of streams through the same link so we let TCP to take care of all the heavy lifting, and use all the existing diagnostic tools.

We use TCP/IP, and every stream has its own connection to utilize bandwidth.

Wire Protocol

The protocol has only one message: data transfer, and it’s as simple as:

+--...---+---------...
|<length>| payload ...
+--...---+---------...

The <length> is Varint 1–5 octets depending on the payload size.

Varints are a method of serializing integers using one or more bytes. Smaller numbers take a smaller number of bytes.

Each byte in a varint, except the last byte, has the most significant bit (msb) set — this indicates that there are further bytes to come. The lower 7 bits of each byte are used to store the two’s complement representation of the number in groups of 7 bits, least significant group first.

So a payload of 0–127 bytes long requires only 1 additional byte of metadata.

We use nanoQ in the existing backend infrastructure where integrity checking, routing, encryption are already in place. This minimalistic protocol is easy to implement using almost any programming language.

API v0 and Implementation details

At the very beginning, we decided that nanoQ should be data oblivious: some of our services speak JSON, some — Protobuf or msgpack — whatever works best.

The publisher’s API is:

Publish(ctx context.Context, payload []byte, streamKey interface{}) error

So our payload is simply a slice of bytes. The streamKey defines whether to use an existing connection or to create a new one — every new streamKey is a new connection.

Our subscriber’s API v0 was:

type IncomingMsg struct {
Payload []byte
StreamDescriptor interface{}
}
Subscribe(ctx context.Context) <-chan IncomingMsg

And it was a no-brainer to use go channels as internal queues. It was chan []byte all the way.

It turned out that this design has several critical flaws.

Let’s start with the publisher.

payload := []byte(“Example payload”)
// We store the message into the internal queue
Publish(ctx, payload, “example”)
payload[3] = 0 // Wait! What?! This is illegal!!!

Because slice is essentially a pointer, we can not just send payload down the channel — user can still modify the contents, and then:

  1. it’s a race condition, and
  2. we’re transferring spoiled data

So at this point, we had to either explain to every user not to do this, pretty please, or to allocate a copy of each incoming message (guess what we’ve chosen, and write a comment).

From the subscriber’s part, we had the same problem — API promised that every incoming message will be delivered as its own []byte slice, so we had to allocate memory for every incoming packet too.

Is there a problem with allocations? Go is not magical. Nothing is magical. Our job as engineers is to fight magic and to explain how things actually work, a plausible explanation at least — no magic.

Go goes a great length to convince us that garbage collector and runtime don’t exist (and it’s amazingly convincing). But when you allocate here and there a couple hundred thousand times per second, suddenly the garbage collectors would like a word with you.

Oh man, CPU usage was high as a kite.

Redesign and the current API

The pillar of the new design became circular buffer — we used it for internal queues instead of chan []byte.

So that we can read and write using the same pre-allocated memory arena.

Publisher’s API stayed the same:

Publish(ctx context.Context, payload []byte, streamKey interface{}) error

We just copy the contents of the payload into a circular buffer already in the wire protocol format. Background thread will flush the buffer depending on a FlushFrequency option. This and NoDelay (disable or enable Nagle’s algorithm) options control latency vs. bandwidth “slider”.

Subscriber’s API became more io.Reader -like:

Receive(ctx context.Context, p []byte) (payload []byte, stream StreamDescriptor, err error)

The payload is the same slice as p but of the length of the received message.

Having this API, a user can decide whether to reuse the same memory for each incoming message or to allocate.

Built-in Prometheus metrics

This part may seem like an overkill for a library, calling itself minimalistic.
nanoQ can lose data, and we want a user to know if this is the case. The usual way of a program to programmer communication — logging doesn’t work well. If you’re losing thousands of messages per second it’s not the best idea to produce thousands of log lines. And that’s exactly the reason why metrics were invented.

We hope that nanoQ will solve some of your problems too. Fork us on GitHub.

--

--

Iurii Krasnoshchok
Aigent
Writer for

Professionally pressing buttons since 2001. Would code even if it would be illegal.