Note about pluggable components in Centrifuge library

Alexander Emelin
5 min readMar 24, 2019

--

This is just a note for Centrifuge library users on how I finally made possible to use pluggable components for PUB/SUB, history and presence.

As you may know the central part of Centrifuge is Engine interface. Engine implementations are responsible for several things:

  • PUB/SUB mechanics. So clients subscribed to channel on different server nodes could receive message if it was published to any of running nodes
  • Channel history. This is an immutable log of all publications (messages) in channel with configured retention period and size. So client can restore missed messages from certain position. Think about Kafka or Nats-Streaming — the general idea is similar here
  • Channel presence. Presence is an information about current online status of users — it shows who is subscribed on channel at moment — pretty useful for real-time multiplayer games and chat rooms with limited amount of users

Originally Centrifugo (and now Centrifuge library) had two builtin engines:

  • Memory engine. This one keeps everything in process memory and has no PUB/SUB part so developers can only start single Centrifugo node. As an option you can start several server nodes with Memory engine and publish new messages to all of them (not very beautiful but works). Memory engine is insanely fast as it does not have any network or disk round-trips
  • Redis engine. With this engine it’s possible to scale Centrifugo horizontally. It allows to scale connections to many server nodes due to Redis PUB/SUB mechanism. And it’s possible to scale Redis itself due to builtin Redis sharding support (client-side consistent sharding by channel because PUB/SUB does not scale well in Redis cluster)

This all works pretty fine but here is what I want to say. While Centrifugo is a separate server which can dictate its dependencies like Redis server — library should ideally provide more freedom in things developer can do with it. As soon as I first announced this library I almost immediately got a request for pluggable components. Looks reasonable, doesn’t it? But while this all seems like simple and obvious things to do — it was not actually.

When history option on Centrifuge lib should do two things after message was published:

  • save published message to history
  • publish to PUB/SUB broker to distribute over connected clients.

Now imagine situation — you publish message to PUB/SUB, and then save to history log. But what if saving to history fails? You can retry the whole operation but client will eventually receive duplicate messages over PUB/SUB.

You can first save to history and then publish to PUB/SUB. Looks better. But what if publish call to PUB/SUB system fails? In this case you again can retry the whole operation — but you can end up with duplicate messages in channel history log. Or you can ignore an error while publishing to broker — but in this case currently connected clients will miss this message.

In case of Redis Engine the dilemma has been reasonably solved by Lua script which is executed atomically in Redis. In one round-trip to Redis we can save message to history log and publish it to channel. It’s both effective and pretty reliable. Though I suppose Lua transaction nature does not spread on PUB/SUB operations. Redis has limited buffers for PUB/SUB connections (16mb) — can also result in message drop. But the chance of failure with Lua is to be fair is very low on practice and never caused any problem yet — it works just fine.

When we have separate systems for PUB/SUB operations and history log we get a much bigger risk of failing one of the operations. This stopped me from splitting Engine interface to separate components until this moment.

But recently I found a compromise which is both reliable and pretty elegant in my opinion. Solution allowed to finally split Engine to Broker, HistoryManager, PresenceManager interfaces and make them all pluggable and interchangeable to a certain extent. Centrifuge already was similar to event sourcing system, now I added a mechanism that allows to reliably deliver messages to clients without duplicates and without losing message order.

Here is how it works. As soon as history is on for channel Centrifuge library adds incremental sequence number to each Publication saved to history storage and then published into Pub/SUB broker (actually on practice we have two uint32 fields called seq and gen to optimally work in Javascript which has limited support regarding to big numbers). Client libraries track the incremental sequence of each received Publication in channel. On reconnect (for example after internet connection loss or after Centrifuge node restart) clients can recover missed messages from last seen sequence. This is how things already worked since Centrifugo v2.

Now on server side we also know a sequence number of last Publication we sent to client. We check each new message from PUB/SUB system for expected sequence number (i.e. current_sequence+1). We also add periodical polling on server side to ask actual state in channel from history storage.

If server side client sequence does not match last sequence number in history or new message from PUB/SUB does not contain expected sequence we can simply disconnect client. It will immediately reconnect and recover all missed messages starting from last seen sequence.

Now in the moment of publishing new message it’s only required to successfully save message to history storage and then even if publishing to PUB/SUB broker fails — it does not matter — clients will restore correct state eventually due to strategy described above.

Of course with this solution clients can receive messages with some delay in failure scenarios. But I think this is reasonable as developers mostly need to track message history in applications that can tolerate delay of message (for example chat apps) at the moment of component failures which are rare on practice.

Of course this mechanics adds more load on storage. The load is linearly proportional to amount of active channels with history_recover option on. There is an optimization to not poll history storage for state in channels with high message rate as we can delay history storage polling attempt after receiving new Publication from PUB/SUB system.

The strategy described above for channels with history resolved my fears about splitting Engine into separate components. Now PUB/SUB broker, HistoryManager and PresenceManager can be set separately. You can find actual interface descriptions here.

--

--