MongoDB 3.6 change streams example with Node.js

What is oplog(operations log)?

The oplog (operations log) is a special capped collection that keeps a rolling record of all operations that modify the data stored in your databases. Oplogs came into existence to support MongoDB’s replication features. The idea is that once a replica is fully synced with the master it only has to follow the latest entries in the oplog to find out what changes — inserts, updates, and deletions — are being written into the databases of that MongoDB server.

Setup Replica Set

prerequisites: mongodb 3.6.0 or above

> mongod --port 27017 --dbpath ./mdata-1 --smallfiles --replSet mongo-repl
> mongod --port 27018 --dbpath ./mdata-2 --smallfiles --replSet mongo-repl
> mongod --port 27019 --dbpath ./mdata-3 --smallfiles --replSet mongo-repl
> mongo --port 27017
MongoDB shell version v3.6.0
connecting to: mongodb://127.0.0.1:27017/
MongoDB server version: 3.6.0
> rs.initiate({
"_id" : "mongo-repl",
"members" : [
{
"_id" : 0,
"host" : "localhost:27017"
},
{
"_id" : 1,
"host" : "localhost:27018"
},
{
"_id" : 2,
"host" : "localhost:27019"
}
]
})
{
"ok" : 1,
"operationTime" : Timestamp(1512853184, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1512853184, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}

Setup nodejs Project

Create new folder and initialize it with npm init -y . Install nodejs mongodb native package v3.0 or above with following command.

> npm i mongodb@3
const MongoClient = require("mongodb").MongoClient;
const assert = require("assert");
/*
Modify Change Stream Output using Aggregation Pipelines
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$match, $project, $addFields, $replaceRoot, $redact
See Change Events for more information on the change stream response document format.
*/
const pipeline = [
{
$project: { documentKey: false }
}
];
MongoClient.connect("mongodb://localhost:27017,localhost:27018,localhost:27019?replicaSet=mongo-repl")
.then(client => {
console.log("Connected correctly to server");
// specify db and collections const db = client.db("superheroesdb");
const collection = db.collection("superheroes");
const changeStream = collection.watch(pipeline);// start listen to changes
changeStream.on("change", function(change) {
console.log(change);
});
// insert few data with timeout so that we can watch it happeningsetTimeout(function() {
collection.insertOne({ "batman": "bruce wayne" }, function(err) {
assert.ifError(err);
});
}, 1000);
setTimeout(function() {
collection.insertOne({ "superman": "clark kent" }, function(err) {
assert.ifError(err);
});
}, 2000);
setTimeout(function() {
collection.insertOne({ "wonder-woman": "diana prince" }, function(err) {
assert.ifError(err);
});
}, 3000);
setTimeout(function() {
collection.insertOne({ "ironman": "tony stark" }, function(err) {
assert.ifError(err);
});
}, 4000);
setTimeout(function() {
collection.insertOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
}, 5000);
// update existing documentsetTimeout(function() {
collection.updateOne({ "ironman": "tony stark" }, { $set: { "ironman": "elon musk" } }, function(err) {
assert.ifError(err);
});
}, 6000);
// delete existing documentsetTimeout(function() {
collection.deleteOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
const MongoClient = require("mongodb").MongoClient;
const assert = require("assert");
/*
Modify Change Stream Output using Aggregation Pipelines
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$match, $project, $addFields, $replaceRoot, $redact
See Change Events for more information on the change stream response document format.
*/
const pipeline = [
{
$project: { documentKey: false }
}
];
MongoClient.connect("mongodb://localhost:27017,localhost:27018,localhost:27019?replicaSet=mongo-repl")
.then(client => {
console.log("Connected correctly to server");
// specify db and collections
const db = client.db("superheroesdb");
const collection = db.collection("superheroes");

const changeStream = collection.watch(pipeline);
// start listen to changes
changeStream.on("change", function(change) {
console.log(change);
});
// insert few data with timeout so that we can watch it happeningsetTimeout(function() {
collection.insertOne({ "batman": "bruce wayne" }, function(err) {
assert.ifError(err);
});
}, 1000);
setTimeout(function() {
collection.insertOne({ "superman": "clark kent" }, function(err) {
assert.ifError(err);
});
}, 2000);
setTimeout(function() {
collection.insertOne({ "wonder-woman": "diana prince" }, function(err) {
assert.ifError(err);
});
}, 3000);
setTimeout(function() {
collection.insertOne({ "ironman": "tony stark" }, function(err) {
assert.ifError(err);
});
}, 4000);
setTimeout(function() {
collection.insertOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
}, 5000);
// update existing document setTimeout(function() {
collection.updateOne({ "ironman": "tony stark" }, { $set: { "ironman": "elon musk" } }, function(err) {
assert.ifError(err);
});
}, 6000);
// delete existing document setTimeout(function() {
collection.deleteOne({ "spiderman": "peter parker" }, function(err) {
assert.ifError(err);
});
}, 7000);
})
.catch(err => {
console.error(err);
});
{ _id:
{ _data:
Binary {
_bsontype: ‘Binary’,
sub_type: 0,
position: 49,
buffer: <Buffer 82 5a 66 cb 44 00 00 00 01 46 64 5f 69 64 00 64 5a 66 cb 44 9b 7e 74 de fd b6 7f 2b 00 5a 10 04 e7 73 de ab f7 b7 48 b6 9d 3b 0b ed 46 58 1d 64 04> } },
operationType: ‘insert’,
fullDocument: { _id: 5a66cb449b7e74defdb67f2b, spiderman: ‘peter parker’ },
ns: { db: ‘superheroesdb’, coll: ‘superheroes’ } }
{ _id:
{ _data:
Binary {
_bsontype: ‘Binary’,
sub_type: 0,
position: 49,
buffer: <Buffer 82 5a 66 cb 45 00 00 00 01 46 64 5f 69 64 00 64 5a 66 cb 43 9b 7e 74 de fd b6 7f 2a 00 5a 10 04 e7 73 de ab f7 b7 48 b6 9d 3b 0b ed 46 58 1d 64 04> } },
operationType: ‘update’,
ns: { db: ‘superheroesdb’, coll: ‘superheroes’ },
updateDescription: { updatedFields: { ironman: ‘elon musk’ }, removedFields: [] } }
{ _id:
{ _data:
Binary {
_bsontype: ‘Binary’,
sub_type: 0,
position: 49,
buffer: <Buffer 82 5a 66 cb 46 00 00 00 01 46 64 5f 69 64 00 64 5a 66 cb 44 9b 7e 74 de fd b6 7f 2b 00 5a 10 04 e7 73 de ab f7 b7 48 b6 9d 3b 0b ed 46 58 1d 64 04> } },
operationType: ‘delete’,
ns: { db: ‘superheroesdb’, coll: ‘superheroes’ } }

Links

Change Streams Documentation

Example Project

--

--

Developer

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store