Shindig: Reactive Meteor Publish/Subscribe with Any Database

Chet Corcos
6 min readNov 24, 2015

--

Meteor Development Group (MDG) built a sweet driver which interprets Mongo queries and churns through Mongo’s operation log to determine what exactly changes effect what queries, reactively sending updates to clients. This is a tractable problem because Mongo is non-relational. But when doing complicated relational queries, its not so simple to determine if adding or removing an edge or node in a graph will change the result of a query without simply re-running the query.

So my idea was: if we have a fast database, we could intelligently re-run queries, compute the changes, and send reactive updates to the client.

Now before going further, let me first give you a tour of how Meteor.publish and Meteor.subscribe work.

A Short Tour of Meteor’s Publish/Subscribe Architecture

Meteor.publish takes a function that returns a Mongo.Cursor. A Mongo cursor is a representation of a database query that describes what is to be sent to the client.

Meteor.publish 'person', (id) -> 
People.find(id)

The important thing about these Mongo cursors is observeChanges.

People.find(id).observeChanges
added: (id, fields) ->
# new document
changed: (id, fields) ->
# changed document
removed: (id) ->
# removed document

This is the heart of all reactivity in Meteor. Meteor’s DDP websocket protocol has messages that correspond exactly to these methods. When a cursor is returned from Meteor.publish, data from that cursor is send to the client using this very simple _publishCursor function.

Mongo.Collection._publishCursor = function (cursor, sub, collection) { 
var observeHandle = cursor.observeChanges({
added: function (id, fields) {
sub.added(collection, id, fields);
},
changed: function (id, fields) {
sub.changed(collection, id, fields);
},
removed: function (id) {
sub.removed(collection, id);
}
});
sub.onStop(function () {
observeHandle.stop();
});
};

So, after studying this function, I realized we can send a document to the client pretty simply like this:

Meteor.publish 'message', ->
this.added('messages', 1, {value:'Hello World!'})

This document will end up in a client-side Minimongo database called ‘messages’. You can see this in action if you put this code on the client.

Messages = new Mongo.Collection 'messages' 
Meteor.subscribe 'messages', ->
console.log Messages.findOne().value
# => Hello World!

In between these publish and subscribe functions is a really cool feature called Merge Box. Meteor keeps track of all the data sent to the client during the current user session and whenever data is sent through a publication, it sends only the minimal amount of information to the client and never sending duplicate information.

On the client, Minimongo holds all the data in a single cache and receives DDP messages through Meteor.connection.registerStore. These update messages are just raw DDP added, changed, or removed messages.

Issues Meteor’s Pub-Sub

At this point, I had some foresight about some problems I was going to have.

  1. All subscription data is jumbled up in one giant cache on the client. This isn’t usually a problem when you’re using Mongo because you’re limited to simple (non-relational) queries that can be run on client. However some complicated relational queries and sorting require the entire corpus of data and cannot possibly be run on the client. Thus it would be necessary to maintain a separation of different subscription results.
  2. Meteor’s DDP protocol doesn’t yet support the ordered version of observeChanges. If we’re going to be doing advanced sorting with our database, we’ll need to maintain the order of the documents on the client.

Hacking away on Meteor’s Pub-Sub

There was no nice way of solving this problem. I had to invent some hacks to bend Meteor’s code to my will. All considered, it turned out pretty well.

First, I needed a way of getting the ordered version of observeChanges to work with the existing DDP protocol, specifically, the addedBefore and movedBefore messages. I did this by encoding these messages as added and changed messages with special fields indicating which subscription and which position.

if Meteor.isServer
createOrderedObserver = (pub, subId) ->
addedBefore: (id, fields, before) ->
fields['any-db'] = {}
fields['any-db'][subId] = before.toString()
pub.added('any-db', id, fields)
movedBefore: (id, before) ->
fields = {}
fields['any-db'] = {}
fields['any-db'][subId] = before.toString()
pub.changed('any-db', id, fields)
changed: pub.changed
removed: pub.removed
Meteor.publish 'feed', (userId) ->
pub = this
events = Neo4j.getUserFeed(userId)
observer = createOrderedObserver(pub, pub._subscriptionId)
events.map (e) ->
observer.addedBefore(e._id, _.omit(e, '_id'), null)
pub.ready()
if Meteor.isClient
Meteor.connection.registerStore 'any-db',
update: (msg) ->
if positions = msg.fields['any-db']
# here we can interpret the message as
# addedBefore or movedBefore
for subId, position in positions
# heres the subscriptionId and the
# position for you on the client

This won’t quite always work as expected because addedBefore and movedBefore messages are stateful and Merge Box is stateless. Let me give you an example of what I mean.

Suppose I move document X to the end of a list. When an item is moved to the end of the list, before is null. So the position property for this document for a specific subscription is null. Then suppose I move document Y to the end of the list. Same thing as before. Now suppose I move X to the end of the list again. The second time you try to publish that X has a position of null, Merge Box will not send this message to the client because that field already has the same value on the client. Whenever this happens, the document order gets out of sync and thats bad news.

The solution is to send a random hex string along with the position in a formatted string. This way, the value of the field changes every time and Merge Box will always send a new position value to the client.

salt = (value) ->
"#{Random.hexString(10)}.#{value}"
createOrderedObserver = (pub, subId) ->
addedBefore: (id, fields, before) ->
fields['any-db'] = {}
fields['any-db'][subId] = salt(before)
pub.added('any-db', id, fields)
movedBefore: (id, before) ->
fields = {}
fields['any-db'] = {}
fields['any-db'][subId] = salt(before)
pub.changed('any-db', id, fields)
changed: pub.changed
removed: pub.removed

Now we have all the information necessary to reconstruct the ordered version of observeChanges on the client. It will take some tedious imperative code, but for now, just understand that its possible. The only problem left to solve with these ordered publications is reactivity.

To make any non-reactive database reactive, we just need to re-run stale queries, compare the results with the previous result, and call the appropriate observeChanges method. It turns out, this isn’t actually all that easy. Everything was going well for me until I tried writing movedBefore. This is a pretty challenging problem when you think about it: If one document moves from one end of a list to the other, does one document move, or do all the others? To a non-programmer, this seems like an easy problem, but its not. I spent some time thinking about how to optimally send the minimal amount of movedBefore messages until I got frustrated and realized I could just check out how MDG was doing it.

Apparently to efficiently compute the minimal amount of moved messages, you need to use maximum common subsequence algorithm to maintain the positions of the most documents. This requires dynamic programming and I was glad that I wouldn’t have to write it myself. Instead I just used DiffSequence.diffQueryOrderedChanges which is conveniently pulled out into its own diff-sequence package in Meteor 1.2. This made everything a lot easier for me. Using my ordered observer hack, diff-sequence basically does the rest!

refreshFeed = {}Meteor.publish 'feed', (userId) ->
pub = this
events = []
observer = createOrderedObserver(pub, pub._subscriptionId)
refresh = ->
nextEvents = Neo4j.getUserFeed(userId)
DiffSequence.diffQueryOrderedChanges(
events
nextEvents
observer
)
events = nextEvents
refresh()
pub.ready()
refreshFeed[userId] = refresh
pub.onStop ->
delete refreshFeed[userId]

This is all we need on the server to have some basic reactivity. We can simply call refresh on the appropriate publications whenever we suspect data might have changed. And thanks to Merge Box, we’re never sending duplicate data to the client.

Meteor.methods
follow: (followId) ->
if Meteor.isServer
check(followId, String)
check(@userId, String)
Neo4j.setFollow(@userId, followId)
refreshFeed[@userId]?()

Now that we’ve tackled publications, we need to start thinking about subscriptions. There is a lot of code that goes into the subscriptions, but it is actually pretty simple and mostly consists of just parsing the DDP messages. Each subscription object keeps track of the data that belongs to it and has observeChanges methods to update the data, mimicking the messages parsed from DDP.

The interface for working with subscriptions is really simple. While Tracker is super cool, I decided to keep it simple so it will play nicer with React (I don’t use Blaze at all anymore).

feed = subscribe 'feed', userId, (data) ->
# onReady, here's your data!
listener = feed.onChange (data) ->
# onChange, here's your data!
feed.data # the current data at any point
listener.stop() # stop listening
feed.stop() # unsubscribe and stop all listeners

That’s basically it! I created a Meteor package that generalizes all this code to allow you to have many subscriptions, many publications, and keep it all well organized in a large production app: ccorcos:any-db.

Many of you may be thinking that there’s still a big elephant in the room. Well you’re right — there’s no latency compensation on the client! Well thats the topic of the next article.

--

--