Audit trails with MongoDB’s change stream

Lim Yi Sheng
Helicap Engineering
6 min readFeb 10, 2020

In this post, we will be covering how our company, Helicap, uses MongoDB’s change streams to track any real time data changes, as well as implement audit trails.

For those who do not know Helicap, we are a FinTech investment firm specialising in the alternative lending space in the Asia Pacific region. We help people improve their livelihoods by driving financial inclusion and enabling our partner platforms to transform the way consumers and SMEs access credit today.

Being in the financial space, trust and accountability is paramount, and one of main and earliest concerns, was to set up an audit trail. Our database of choice (as elaborated on an earlier post) was MongoDB due to the unstructured nature of the data we work with. One of the reasons we chose was because of Change Streams, which was introduced in version 3.6. To quote directly from the documentation:

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

That sounds perfect for our requirements of creating an audit trail. Any changes made to our database could be captured and logged elsewhere.

Architecture

So our architecture will look something like this:

Change Stream Watcher architecture

We will have applications that will perform CREATE, READ, UPDATE, and DELETE operations from the application database. We will then use our Change Stream watcher to listen to the Change Stream of our application database, and persists any Write operations to a separate audit database.

The reason why we chose a separate audit database is due to separation of concerns, and possible performance impact if both the audit trail and the actual operational data are kept on the same database.

Setup

We will be writing a small application using Go to connect to the Change Streams, and propagating the changes to the audit database. These is the list of software we will be using:

Code

The gist can be found here:

https://gist.github.com/yish91/5f3dbe33f229a3f36f13ebd6491c706e

So let us step through the code:

func ChangeStreamWatcher(client *mongo.Client, db *mongo.Database) {

This program has a function called ChangeStreamWatcher. It takes in a Mongo client. This client is one which we will connect to create our audit database.

It also takes in a database. This is the application database which we will be tracking and persisting an audit trail for.

auditDatabase := "audit" 
token := "token"

Audit is the name of database that we are saving our audit trails. Token is actually a collection within the audit database, which we save our resume token for Change Stream.

Change Stream tracks all events as change stream event documents. As our watcher processes each event document, we will then save the id of the last document processed into the token table. We will touch on how we use the token collection very shortly.

var pipeline mongo.Pipeline 
ctx := context.Background()
opts := options.FindOne()
opts.SetSort(bson.D{{"$natural", -1}})

This code is just basic initialisation of the MongoDB pipeline for our change stream. The options is just to sort the token documents, so that the latest document is first.

tokenDoc := &bsoncore.Document{} 
err := client.Database(auditDatabase).Collection(token).FindOne(context.TODO(), bson.D{{}}, opts).Decode(&tokenDoc)
var cur *mongo.ChangeStream

This is where we query the token collection, and check if there are any existing tokens that we have already saved.

if err == nil {  
resumeToken := strings.Trim(tokenDoc.Lookup("_data").String(), "\"")
cso := options.ChangeStream()
cso.SetResumeAfter(bson.D{{"_data", resumeToken}})
cur, err = db.Watch(ctx, pipeline, cso)
if err != nil {
log.Println(err)
}
}

This is where we set the resumeAfter for the Change Stream. By extracting the token of the latest event document that we have processed up till, we can then resume processing whatever event documents that are unprocessed.

E.g. There are 10 event documents in Change Stream, the first document starts with an id of 1, and the last document ending with 10. So the token collection will store the id of 10.

Assuming that my Change Stream watcher crashes, but somehow, more changes were made to my database, which results in 2 more documents being added with an id of 11 and 12 respectively.

As the Change Stream watcher is restarted, we will then go to the token collection, extract the token which we have processed up to (which is 10), and set our Change Stream to resume processing documents after the id of 10. Our watcher will then pick up documents 11 and 12.

This token collection helps to ensure the resiliency of the system. Should our watcher crash, we will not miss out on any event changes once we restart our watcher.

else {  
cur, err = db.Watch(ctx, pipeline)
if err != nil {
log.Println(err)
}
}

This code snippet just means, if there are no documents, simply just watch the Change Stream for any changes.

defer cur.Close(ctx)

This line just ensures that we will close our connection when we shut the program down.

for cur.Next(ctx) {  
elem := &bsoncore.Document{}
if err := cur.Decode(elem); err != nil {
log.Println(err)
}
rawCollectionName := elem.Lookup("ns").Document().Lookup("coll").String()
collectionName := strings.Trim(rawCollectionName, "\"")

This code snippet dictates how the program will handle every change stream event document that passes through our pipeline. It will basically decode the byte stream into a bson document for processing. As per the Mongo document on change stream event document, we will extract out the name of the collection which the change originates from.

session, _ := client.StartSession()  
_ = session.StartTransaction()
err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {

We will start a Mongo session so that we can make a transaction which involves two changes 1) the change stream event document, 2) the token to persist in the token collection.

Here is where you may ask, if the id of the change stream event document is used as the resume token, is this not a duplication of efforts?

That is true. However, we are saving the change stream event document in each of their respective collection. So if it comes from the user collection, it will subsequently be saved in the corresponding user collection in the audit database.

So for us to search the latest document across all the collections will be a rather expensive task, as compared to just saving the token in a separate collection of its own.

_, _ = client.Database(auditDatabase).Collection(collectionName).InsertOne(ctx, elem)   
if err != nil {
_ = session.AbortTransaction(sc)
return err
}
_, err = client.Database(auditDatabase).Collection(token).InsertOne(ctx, cur.ResumeToken())
if err != nil {
_ = session.AbortTransaction(sc)
return err
}
_ = session.CommitTransaction(sc)
return nil
})
session.EndSession(ctx)

So the code snippet above saves the change stream event document in its corresponding collection, as well as save the token. As this is in a transaction, both of these changes will have to be persisted for the transaction to be successful. This is to prevent any desynchronisation between our token collection, and the changes that we are persisting.

After the transaction is committed successfully, the session is then terminated, and the program will listen to another change due to for cur.Next(ctx).

Recap

With this, we have created a lightweight watcher that listens to any changes made to an application database, and persist said changes to a separate audit database. These changes will be persisted in a collection structure reflective of that of the application database. So if the application database has 3 collections: x, y, z, our audit database will also have the same collections, on top of a token collection.

This organization and structure will help us greatly when we want to do rollbacks, or even a poor man’s version of event sourcing.

Conclusion

Our team here at Helicap love using MongoDB given the unstructured nature of our data. With Change Streams being introduced to our workflow, we were able to set up an audit trail in a matter of minutes.

On top of creating an extremely useful tool, the audit trail also serves as proof of concept, that Change Streams can be used to really extend the capabilities of any application using MongoDB.

We will be elaborating a lot more on how we are improving our application with such technologies. So do watch this space!

Thank you for reading this article, and we hope that this has helped anyone looking to use Change Streams in their workflow as well.

--

--