MongoDB 3.6 change streams example with Node.js

mongodb 3.6 released with many new features and changes. But my favorite feature is change streams. I like it so much that I also tweet about it.

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a collection and immediately react to them.

What is oplog(operations log)?

The oplog (operations log) is a special capped collection that keeps a rolling record of all operations that modify the data stored in your databases. Oplogs came into existence to support MongoDB’s replication features. The idea is that once a replica is fully synced with the master it only has to follow the latest entries in the oplog to find out what changes — inserts, updates, and deletions — are being written into the databases of that MongoDB server.

As oplog is used in replica set, we can not use change streams on standalone mongodb server. So we have to setup replica set with minimum 3 mongodb servers.

Setup Replica Set

prerequisites: mongodb 3.6.0 or above

To setup replica set, create 3 folders mdata-1, mdata-2, mdata-3 and run following commands.

> mongod --port 27017 --dbpath ./mdata-1 --smallfiles --replSet mongo-repl
> mongod --port 27018 --dbpath ./mdata-2 --smallfiles --replSet mongo-repl
> mongod --port 27019 --dbpath ./mdata-3 --smallfiles --replSet mongo-repl

Verify that you are using 3.6.0 or above version

> mongo --port 27017
MongoDB shell version v3.6.0
connecting to: mongodb://127.0.0.1:27017/
MongoDB server version: 3.6.0

Now, we have to initialize out replica set so that they can talk to each other and we can run our nodejs example.

> rs.initiate({
"_id" : "mongo-repl",
"members" : [
{
"_id" : 0,
"host" : "localhost:27017"
},
{
"_id" : 1,
"host" : "localhost:27018"
},
{
"_id" : 2,
"host" : "localhost:27019"
}
]
})
{
"ok" : 1,
"operationTime" : Timestamp(1512853184, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1512853184, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}

Setup nodejs Project

Create new folder and initialize it with npm init -y . Install nodejs mongodb native package v3.0 or above with following command.

> npm i mongodb@3

Now, Let’s create index.js and start writing code.

const MongoClient = require("mongodb").MongoClient;
const assert = require("assert");
/*
Modify Change Stream Output using Aggregation Pipelines
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$match, $project, $addFields, $replaceRoot, $redact
See Change Events for more information on the change stream response document format.
*/
const pipeline = [
{
$project: { documentKey: false }
  }
];

Establish a mongodb connection with replica set.

MongoClient.connect("mongodb://localhost:27017,localhost:27018,localhost:27019?replicaSet=mongo-repl")
.then(client => {
console.log("Connected correctly to server");
    // specify db and collections
    const db = client.db("superheroesdb");
const collection = db.collection("superheroes");

Here comes the most important part. We have to Setup watch on collection, so that we can listen to whatever operation changes happening in that collection like insert, update, delete, replace etc.

const changeStream = collection.watch(pipeline);
// start listen to changes
changeStream.on("change", function(change) {
console.log(change);
});

Let’s insert few records First

// insert few data with timeout so that we can watch it happening
setTimeout(function() {
collection.insertOne({ "batman": "bruce wayne" }, function(err) {
assert.ifError(err);
});
}, 1000);
setTimeout(function() {
collection.insertOne({ "superman": "clark kent" }, function(err) {
assert.ifError(err);
});
}, 2000);
setTimeout(function() {
collection.insertOne({ "wonder-woman": "diana prince" }, function(err) {
assert.ifError(err);
});
}, 3000);
setTimeout(function() {
collection.insertOne({ "ironman": "tony stark" }, function(err) {
assert.ifError(err);
});
}, 4000);
setTimeout(function() {
collection.insertOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
}, 5000);

Now you should see, as time passes, every record will get inserted into superheroes collection and we will get every insert happening on our screen.

// update existing document
setTimeout(function() {
collection.updateOne({ "ironman": "tony stark" }, { $set: { "ironman": "elon musk" } }, function(err) {
assert.ifError(err);
});
}, 6000);

Sample with update and delete, but this time we will see {operation: 'update'} instead of insert.

// delete existing document
setTimeout(function() {
collection.deleteOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});

Full index.js

const MongoClient = require("mongodb").MongoClient;
const assert = require("assert");
/*
Modify Change Stream Output using Aggregation Pipelines
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$match, $project, $addFields, $replaceRoot, $redact
See Change Events for more information on the change stream response document format.
*/
const pipeline = [
{
$project: { documentKey: false }
  }
];
MongoClient.connect("mongodb://localhost:27017,localhost:27018,localhost:27019?replicaSet=mongo-repl")
.then(client => {
console.log("Connected correctly to server");
// specify db and collections
const db = client.db("superheroesdb");
const collection = db.collection("superheroes");

const changeStream = collection.watch(pipeline);
    // start listen to changes
changeStream.on("change", function(change) {
console.log(change);
});
    // insert few data with timeout so that we can watch it happening
setTimeout(function() {
collection.insertOne({ "batman": "bruce wayne" }, function(err) {
assert.ifError(err);
});
}, 1000);
setTimeout(function() {
collection.insertOne({ "superman": "clark kent" }, function(err) {
assert.ifError(err);
});
}, 2000);
setTimeout(function() {
collection.insertOne({ "wonder-woman": "diana prince" }, function(err) {
assert.ifError(err);
});
}, 3000);
setTimeout(function() {
collection.insertOne({ "ironman": "tony stark" }, function(err) {
assert.ifError(err);
});
}, 4000);
setTimeout(function() {
collection.insertOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
}, 5000);
    // update existing document
    setTimeout(function() {
collection.updateOne({ "ironman": "tony stark" }, { $set: { "ironman": "elon musk" } }, function(err) {
assert.ifError(err);
});
}, 6000);
    // delete existing document
    setTimeout(function() {
collection.deleteOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
}, 7000);
})
.catch(err => {
console.error(err);
});

Output on Console:

Insert Document Operation:

{ _id:
{ _data:
Binary {
_bsontype: ‘Binary’,
sub_type: 0,
position: 49,
buffer: <Buffer 82 5a 66 cb 44 00 00 00 01 46 64 5f 69 64 00 64 5a 66 cb 44 9b 7e 74 de fd b6 7f 2b 00 5a 10 04 e7 73 de ab f7 b7 48 b6 9d 3b 0b ed 46 58 1d 64 04> } },
operationType: ‘insert’,
fullDocument: { _id: 5a66cb449b7e74defdb67f2b, spiderman: ‘peter parker’ },
ns: { db: ‘superheroesdb’, coll: ‘superheroes’ } }

Update Document Operation:

{ _id:
{ _data:
Binary {
_bsontype: ‘Binary’,
sub_type: 0,
position: 49,
buffer: <Buffer 82 5a 66 cb 45 00 00 00 01 46 64 5f 69 64 00 64 5a 66 cb 43 9b 7e 74 de fd b6 7f 2a 00 5a 10 04 e7 73 de ab f7 b7 48 b6 9d 3b 0b ed 46 58 1d 64 04> } },
operationType: ‘update’,
ns: { db: ‘superheroesdb’, coll: ‘superheroes’ },
updateDescription: { updatedFields: { ironman: ‘elon musk’ }, removedFields: [] } }

Delete Document Operation:

{ _id:
{ _data:
Binary {
_bsontype: ‘Binary’,
sub_type: 0,
position: 49,
buffer: <Buffer 82 5a 66 cb 46 00 00 00 01 46 64 5f 69 64 00 64 5a 66 cb 44 9b 7e 74 de fd b6 7f 2b 00 5a 10 04 e7 73 de ab f7 b7 48 b6 9d 3b 0b ed 46 58 1d 64 04> } },
operationType: ‘delete’,
ns: { db: ‘superheroesdb’, coll: ‘superheroes’ } }

Links

Change Streams Documentation

Change Events Documentation

NPM Package

MongoDB M036 Course: New Features and Tools in MongoDB 3.6

Example Project