Centrifugo — protobuf inside, JSON outside

Alexander Emelin
10 min readAug 4, 2016

--

Centrifuge — cat inside:) Image from telegraph.co.uk

This is a post about part of massive internal Centrifugo refactoring made recently. For those who never heard of Centrifugo: this is a real-time messaging server written in Go. You can find more information in my previous article Four years in Centrifuge to get a basic knowledge about server. That post was rather general — overview, history and mission — so a good place to start. Here I want to share some technical details which can be interesting to Centrifugo users or Go developers.

Let’s look at simplified Centrifugo architecture diagram again:

I.e. Centrifugo is a separate independent server to which clients of application connect (using Websocket or SockJS protocol) and subscribe on interesting topics (channels). Then as soon as application backend has new event — it publishes event as JSON message to Centrifugo via API. Centrifugo then broadcasts it to all interested clients. This scheme in particular makes it possible to use Centrifugo in conjunction with application written in any language/framework. For example, at work we use it together with our favorite framework — Django.

From its early releases Centrifugo worked with JSON format— JSON was everywhere. You send API requests in JSON (from app backend to Centrifugo, green arrow on picture above), client protocol is JSON-based (communication between application users and Centrifugo — Websocket or SockJS connection, dashed arrow on scheme).

And in those two places above JSON encoding is rather justified. Because it allows everyone to send API requests, play with them from console using tools like CURL or HTTPie. It’s convenient, easy to implement and support. Client protocol is mostly used by web browser clients, so again — JSON is pretty logical choice. It would make things a bit more complicated (and extra dependency) if I introduced something like MsgPack format for client protocol or API.

By default when you run Centrifugo it works using Memory Engine — everything is kept in process memory. But the drawback is obvious — you are limited in one node so it’s rather non-scalable and fragile architecture. Imagine that machine with single running node goes away! You can configure load balancer to use reserved Centrifugo instance when the main one goes down but scalability issue still remains.

In practice Centrifugo’s approach to real-time (separate independent server) allows to develop some applications in a way that even total short Centrifugo downtime won’t be noticed by most users — developer just should remember about graceful functionality degradation when adding new real-time features. Of course (depending on application type) it could not be possible — for example if real-time is a core application feature.

Here is a bit more production-ready Centrifugo setup:

I.e. lots of Centrifugo nodes running, connected over Redis PUB/SUB mechanism. This allows to load balance clients between nodes. Redis Sentinel could be used to provide high availability for Redis instance in this scheme.

So if you want failover or have grown out of single Centrifugo node you need to use Redis Engine and load balance clients between running Centrifugo nodes (for example we use Nginx with upstream and sticky sessions modules). Sticky sessions important for HTTP-based polling fallbacks provided by SockJS — as it keeps client sessions for a while during reconnects it’s important to connect to the same node.

And before version 1.5 Centrifugo used JSON for all Redis communications too. That communication includes publishing PUB/SUB messages, keeping history cache, maintaining client presence information in channels.

Let’s think about project architecture a bit more (considering Redis Engine). Any client can be connected to any Centrifugo node. So if one node got message published in some channel it should deliver that message to all other nodes that have clients subscribed on this channel (via Redis PUB/SUB). Also Centrifugo maintains message history cache of configured size and configured life time for message recovery mechanism (possibility to recover some messages lost during network glitch for example). So node publishes every new message into Redis. Redis then sends it to all interested (subscribed on that channel) nodes.

Centrifugo has its own additional logic and features on top of channels so we can not just publish messages coming into API to Redis untouched. We should unmarshal every publish command coming into API first, then prepare it for publishing into Redis (though publish data payload provided by application left untouched because json.RawMessage used for it) — we extract channel from API command, apply configuration options to it and construct client Message that looks like this:

So every message that will be finally delivered to client has unique ID, timestamp, channel, data payload and a couple additional fields like client info (see definition above). After preparing message suitable to send to end users we encode it to JSON and publish into Redis. Then every node that receive this message from Redis (it can be zero nodes or more nodes in cluster) broadcast message to all connected (to that node) clients subscribed on channel.

Now imagine how much JSON encoding/decoding work required when processing every new published message. Centrifugo has history and presence data which were also encoded into JSON before saving into Redis data structures. Moreover, Centrifugo is a message stream server — this means that every message in channel should go through Centrifugo even if no actual subscribed clients connected at moment. Let’s look how much time it takes to encode Message into JSON:

BenchmarkMsgMarshalJSON  2022 ns/op    432 B/op     5 allocs/op

Standard library JSON encoding/decoding is rather slow in Go language. It uses reflection, allocates a lot. So looks like if we could speed up this then it could be a huge benefit.

Unfortunately JSON isn’t the best data serialization format if you want speed and compact size. For example it’s pretty hard to make unmarshal operation efficient — you don’t know the length of arrays and type of elements to preallocate memory, it’s text based — so the size of resulting data is much bigger than it could be in binary representation.

First approach I was thinking about was using JSON generation tools: like ffjson or easyjson. You feed them a file with your struct types and get fast generated marshaling/unmarshaling code which does not rely on reflect package. I tried both and they behave pretty well for decoding/encoding JSON (proved the numbers from their repo README files). Though easyjson has one problem with json.RawMessage field that I came across. I also played with jsonparser library — and got pretty nice results. It’s really the fastest available JSON parser with the only cost — it’s rather low-level. And it can only unmarshal data — not very suitable for a task I was solving — fast encoding and decoding inside engine. So maybe I will use it in future to unmarshal incoming API and client protocol JSON requests.

The solution I ended up with was rather radical. Why not to completely get rid of JSON encoding internally? Not every message that was published to Centrifugo should be delivered to clients. Moreover most of the time there won’t be current subscribers. So not every message must be encoded into JSON. We still need to publish message to Redis (maybe there are clients on other nodes or just for history cache purpose), but we can use the fastest available serialization format for this.

So, which data serialization format choices do we have? Pretty of them actually. Here is a repo with benchmarks of most popular Go data serialization format implementations. As the title of this post says I decided to use protobuf format and gogoprotobuf library in particular. Just because it’s one of the fastest in list and I’ve heard several success stories from Go community members. And it’s used in Go monster projects like Etcd, Cockroachdb, Kubernetes and others.

Protobuf requires to write addition schema file for types and run extra command to generate code but the result is blazingly fast. Here is a benchmark serializing Message using gogoprotobuf library:

BenchmarkMsgMarshalGogoprotobuf  168 ns/op   48 B/op   1 allocs/op

12x faster than standard library JSON encoding! And it looks like with Go 1.7 release it will be about 30% faster — look, the same benchmark on the same machine using go1.7rc5:

BenchmarkMsgMarshalGogoprotobuf  124 ns/op   48 B/op   1 allocs/op

One of the problems I needed to solve was making transition between new internal protobuf format and old external JSON client protocol format painless and opaque. So here is how I solved this.

First let’s look on protobuf schema for two types (ClientInfo and Message) shown above:

To generate Go code based on this proto file we must run protoc command with custom generator:

protoc --gogofaster_out=. message.proto

I’ve chosen the most appropriate for my case code generator gogofaster_out. Actually there are other options each with some features in generated code included/excluded. Also I am developing on Mac OS and installed protobuf over homebrew via:

brew install homebrew/versions/protobuf260

– and also needed to provide proper directory to search for imports in proto files using proto_path option. So actual command was:

protoc --proto_path=/Users/fz/gopath/src/:/usr/local/Cellar/protobuf260/2.6.0/include/:. --gogofaster_out=. message.proto

The interesting part of protobuf schema above is how we replaced json.RawMessage field. Gogoprotobuf library allows to define custom types and that is exactly what I did for bytes field— see this line:

(gogoproto.customtype) = "github.com/centrifugal/centrifugo/libcentrifugo/raw.Raw"

Here is source code of that Raw custom type. The idea of this Raw type is that it’s a named type derived from []byte, and it works in a way to leave message payload (provided by application publishing new data into channel) untouched. I.e. the same behaviour as json.RawMessage has. This custom type must have several methods which will be called from code generated by protoc command — Marshal, MarshalTo, Unmarshal, Size, Equal, Compare and exported function NewPopulatedX (where X is a name of type, so NewPopulatedRaw in my case). If you want more custom type examples take a look at UUID type implementation in gogoprotobuf repo.

Moreover I defined two extra methods for it to fit JSON Marshaler/Unmarshaler interfaces:

I.e. the same code as json.RawMessage has in standard library. Also gogoprotobuf allows to set extra json tag for field, see:

(gogoproto.jsontag) = "data"

– in schema above.

This all makes resulting generated by gogoprotobuf structs compatible with client JSON protocol so something like:

json.Marshal(message)

– will return the same bytes as before. So we are using protobuf serialization between nodes and compatible with client JSON protocol.

Similarly I added protobuf schema for other message types that Centrifugo uses internally.

Actually the initial goal of this refactoring was not just to improve performance, but to make internal Centrifugo engine interface more clean. The optimizations described here were side effect occurred during hard work refactoring internals. That refactoring affected not only communication between nodes when using Redis engine but also Memory engine (which does not need to use protobuf). To demonstrate how changes between version 1.4 and 1.5 affected Centrifugo performance from user point of view let’s look at simple example.

We will send a command containing a message that must be published into many different channels. It’s called broadcast command in Centrifugo. In real life it can be sending the same message into a list of private user channels.

This is a special case with no actual clients subscribed on those channels just to demonstrate numbers.

Here is example Python code to make this API request (using Cent API library):

Now let’s run it many times for different channels amount (from 1 to 5000) and look at Centrifugo response time median. First for Memory Engine:

Centrifugo response time median on broadcast request (Memory Engine)

As I said above Memory Engine does not need protobuf. So why do we have such a difference between versions? The answer is in just one line of code I added to Memory Engine — do not encode message into JSON if no subscribers currently in channel. Don’t know why I have not done this before:)

And now for Redis Engine:

Centrifugo response time median on broadcast request (Redis Engine)

Here Centrifugo node can’t know if there are clients subscribed on channel (because they can be connected to another node) so we need to encode every message and publish it to Redis. So the difference here is due to replacing JSON to protobuf. And the more encoding work required the bigger difference is.

I think it’s a pretty good speed up. Note that in example above Centrifugo actually does a lot of work. In case of Redis engine: it decodes incoming API JSON request, creates N messages for each channel, applies channel configurations rules, serializes them and calls publish Redis command for each of those messages. So 30 ms for broadcast message into 5000 channels looks fast enough. Of course there is no limit to perfection:)

Also just out of interest I decided to compare Centrifugo with it’s predecessor — Centrifuge written in Python (on top of Tornado). It does not have broadcast command unfortunately so we will publish many different messages into different channels in one request. So the code looks like this:

For Memory Engine it’s 50 ms for Centrifugo (using GOMAXPROCS=1) and 7800 ms for Centrifuge — more than 100x difference! Of course it’s not a fair comparison — Python has its own benefits we all know about. And PyPy. Just curious.

That’s all for today! Any feedback is highly appreciated. Feel free to write me an email (can find it on my github profile page) with any questions about Centrifugo.

--

--