Embracing MongoDB ChangeStreams for replication: Implementation & Challenges

Riddhi Gupta
Hevo Data Engineering
7 min readSep 8, 2021

Hevo is a No-code Data Pipeline as a Service offering that allows customers to leverage the pre-built fully automated pipelines and seamlessly load data from their source/s to the desired destination, typically a Data Warehouse. With the world becoming more data-driven each passing second, the era of the digital age is truly here. Customers want real-time, fast, timely notifications and data access to stay current across all their applications.

To ensure that Hevo’s Data Pipelines meet the need of the hour and supply real-time data, we aimed at tracking real-time changes across the integrations we support. To begin with, we chose to include such functionality in one of our integrations, the well-known MongoDB database.

To determine if something had changed or went under modification in the source MongoDB database, we had three different approaches to explore and choose from:

  • Poll the database every X seconds and discover if something has changed using a timestamp, version number, or status field.
  • Use Database or Application-Level Triggers to execute a piece of code when something changes.
  • Use the Database Transaction/Replication Log, which records every change to the database.

At Hevo, we chose to rely on Replication Logs because they allowed us to achieve outstanding efficiency, particularly in terms of performance and resource use, with minimal development efforts/engineering bandwidth involved. Before we look at how we integrated our Replication Log Approach with ChangeStreams, let’s first get a head start on ChangeStreams.

What are MongoDB ChangeStreams?

ChangeStreams ensure a real-time stream of data changes that occur within your database, straight to your application/the desired destination, without the complexity/risk of tailing the OpLog. ChangeStreams only work with Replica Sets and function as a Wrapper on top of the OpLog.

For more information regarding the same, please refer to the official Mongo documentation.

Image Source: MongoDB

As previously stated, we used Replication Logs to retrieve MongoDB data in real-time. MongoDB supports two methods for bringing in data via Replication Logs. You may accomplish this with MongoDB OpLogs or ChangeStreams.

To deliver the best-in-class experience for our customers, Hevo’s Data Pipelines support both mechanisms and allow customers to choose any one of them.

But, if we already had the support for OpLog, why did we go about incorporating ChangeStreams?

Here’s why we opted for ChangeStreams

Let’s keep this short, crisp, and fast, just like ChangeStreams:

  • Change Events that return as a part of the ChangeStream process, commit to the majority of your Replica Set. In simple terms, it means that the Change Events sent to the client are durable. As a result, applications don’t need to handle data rollback in the event of a failover.
  • ChangeStreams further use Global Locks to ensure total ordering of changes across Sharded Clusters, which is not available in OpLogs. Hence, at Hevo, we’ve only given the option to use ChangeStreams in the case of Sharded Clusters to our customers.
  • It operates as a Managed Wrapper and reduces the overhead of Joins, in complete contrast to OpLog, which needs to be joined with collections to obtain complete records.
  • In addition, ChangeStreams are scalable across nodes. You can create ChangeStreams against any data-bearing node, be it primary or secondary.
  • ChangeStreams present a defined API and robust access controls.
  • ChangeStreams help greatly reduce development efforts, especially when implementing well-known use cases. Whether you are working with notification services, real-time analytics, ETL services, and cross-platform synchronizations, ChangeStreams enable you to execute them all, with ease.
  • Since it’s a wrapper on OpLog it is less prone to errors because you can filter out the documents easily while querying.

MongoDB ChangeStreams in Hevo: How did we achieve it?

Prerequisites

An essential aspect to successfully incorporate functionalities like ChangeStreams is to understand the prerequisites. Here are some of the requirements we had to look into:

  • The source database must be a part of a Replica Set or Sharded Cluster.
  • The Replica Set and Sharded Cluster must utilize version 1 of the Replicate Set Protocol.
  • The database must use the WiredTiger storage engine.

Quick Note: Starting with MongoDB 3.6 till 4.0, you can leverage ChangeStreams only when “majority” read concern support is enabled (default). With MongoDB 4.0 and above, ChangeStreams are accessible regardless of “majority” read concern support. It can either be enabled (default) or disabled to leverage ChangeStreams.

Designing and implementing ChangeStreams

ChangeStreams can be implemented using the MongoDB Node Driver for Node.Js, PyMongo library for Python, or Mongo Driver for Java.

We deployed the primary Mongo Java Driver for building ChangeStreams for our use case at Hevo. This is a quicker implementation that supports No Object-Document Mapping (ODM). It further provides the functionality of reusable code for any existing logic once you’ve created your subscriber.

Using the Mongo Java Driver, we can open the ChangeStream iterator directly on the Mongo connection, a specific database or a particular collection as well:

  • Cursor on a Database: mongoClient.getDatabase().watch(watchOp)
  • Cursor on a Specific Collection: mongoClient.getDatabase().getCollection().watch(watchOp)

This opens the ChangeStream cursor and proceeds iteratively to fetch and store the desired database or collection-related changes. It assumes you’ve established a connection with the MongoDB Replica Set and have accessed the database containing the collection.

In our system, we use the following parameters provided by the ChangeStream document:

  • Document Key: Document containing the event _id.
  • OperationType: The operation occurring on the event (Insert, Update, Delete, Replace).
  • Namespace: Name of the Database and its Collection, where the event occurs.
  • ClusterTime: Time at which the event occurs.
  • ResumeToken: The event _id from where the OpLog resumes. As the event logs are sequentially appended, this helps restart the log accurately.

The OperationType field can provide a variety of values; however, systems at Hevo rely on and leverage the following values:

  • Insert: Represents the information about the document inserted into the database.
  • Update: Represents the information about the updated event. However, by default, it doesn’t record the entire document. To enable complete document fetch, you can use Full Document Look-up.
  • Delete: Represents the information about the document deleted from the database.
  • Replace: Represents the information about the replaced document.

Resuming and restarting ChangeStreams

  • resumeToken: This token will act as a cursor and point to a specific record in your OpLog, allowing you to restart your ChangeStream from that particular point.
  • startAtOperationTime: In the case of a null resumeToken, we leverage the point in time where the most recent/last event occurred. However, if that is null, we can directly start when the Mongo connection emits the first event.

Challenges we faced with MongoDB ChangeStreams

MongoDB ChangeStreams significantly rely on OpLogs to function smoothly. Each OpLog is a capped collection that has a specific size. As new entries arise, more frequent, and at times, more significant updates occur, inevitably, your data requirements rise immensely. To meet such soaring data needs and scale effectively, MongoDB OpLogs automatically delete the initial records.

This mechanism sounds effective and immensely scalable; however, there’s a catch and a relatively big one. When the team at Hevo gave this a more profound thought, we could identify a lot of “what ifs”:

  • What if a user pauses the pipeline for a significantly extended period, and we end up losing the cursor pointing to the specific entry?
  • What if the database undergoes a massive update and receives many modifications, and the record, the cursor was pointing to, gets deleted?

Any of the above errors could rattle the system, taking away the essence of having ChangeStreams in place. So to tackle these and ensure our customers never face this issue with Hevo’s Data Pipelines, we came up with the following solutions:

  • We ask our customers to maintain a retention period of 24 hours when they’ve utilized 99.5% of their OpLog Capacity. It helps ensure that the OpLog is large enough at all times to retain updates.
  • In a rare case, when the record we’re trying to read gets deleted from the OpLog, we fail the MongoDB pipeline and prompt the user with a “Fix Now” option. It allows them to restart all tasks and pull data from the beginning.

Apart from the use-case challenges we identified and successfully tackled, here are some limitations that MongoDB ChangeStreams pose:

  • ChangeStream response documents must not exceed the 16MB BSON document limit. Depending on the size of the documents in the collection against which you start a ChangeStream, notifications may fail if the resultant notification document exceeds the 16MB limit.
  • Opening multiple ChangeStream cursors (tens or more) results in significant delays (up to several minutes) between database writes and notification delivery.

We appreciate you reading the post till the end. Please write to us at dev@hevodata.com with your comments and suggestions. Building a great product takes hard work and determination. With each of us at Hevo sharing the same passion, everyone in the team takes the effort to go the extra mile. If you’d like to be a part of our journey and work on some of these unique challenges, please do check Hevo’s careers page.

--

--