Real-Time Event-Driven Application Development: Change Data Capture With MongoDB

Samarendra Kandala
Fission Labs
Published in
7 min readJun 7, 2024

In today’s fast-paced digital world, businesses must respond to changes and events as they happen. Real-time data processing and event-driven architecture are essential for maintaining a competitive edge.

Achieving this level of responsiveness, however, comes with its own set of challenges. For instance, syncing real-time analytics databases with transactional data.

In response to the evolving needs of modern applications, contemporary databases have introduced a powerful feature known as Change Data Streams(CDC). These streams enable developers to capture and react to data changes in real-time, transforming the way we architect and build event-driven applications.

What exactly is Change Data Capture (CDC)?

Change Data Capture works by continuously monitoring the operations happening on a database or a specific collection or table. Whenever a change occurs — be it a new record being inserted, an existing record being updated, or a record being deleted — CDC captures this change.

This pattern typically depends on the internal Write-Ahead Log (WAL) of a database. By subscribing to changes in the WAL, CDC implementations can relay these changes to downstream applications. This approach avoids generating extra traffic on the database by querying tables or collections, thereby preserving database performance.

CDC Usages:

Microservices Architectures: The outbox pattern is often utilized in event-driven applications, but it can be replaced by Change Data Capture (CDC). CDC can keep the read model in a CQRS pattern up-to-date by streaming real-time changes from the write model.

Data Warehousing and Analytics: CDC facilitates real-time data feeds into analytical platforms and also ensures synchronization between data warehouses and operational databases.

High-Performance Systems: For systems relying on cached data, such as a content management system using Redis, CDC can ensure the cache is always up-to-date by detecting and streaming database changes.

Compliance and Audit Logging: In environments requiring detailed audit logging, such as banking, CDC can monitor and record all data modifications, providing a comprehensive audit trail.

MongoDB Change Streams

The most widely used tool for implementing CDC is Debezium, which offers CDC services for many popular databases. However, Debezium can add complexity to the infrastructure. Fortunately, if you’re working with MongoDB, it offers its own CDC implementation via Change Streams.

Change Streams in MongoDB leverage subscriptions on the oplog, which is its implementation of the Write-Ahead Log (WAL). This allows Change Streams to provide real-time data feeds by using MongoDB’s aggregation framework. This native solution simplifies the process, reducing the need for additional infrastructure complexity while ensuring efficient and seamless data synchronization.

At Fission, we are leveraging Change Data Capture (CDC) in a CQRS-style pattern to synchronize MongoDB with Elasticsearch. All reads and searches are performed in Elasticsearch, handling millions of CDC events per day. Initially, we implemented custom consumer logic for MongoDB streams but eventually transitioned to using Kafka.

Custom Consumer Implementation

The below code snippets will be in Java and Spring but the implementation can be generic in any language.

Spring Code:


@Component
public abstract class AbstractChangeStreamService
implements ApplicationListener<ApplicationReadyEvent> {
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
listenToCollectionChanges();
}
}


public void listenToCollectionChanges(){
// has listner logic
}

MongoDB provides a stream aggregation framework that allows you to create a cursor to watch changes in a collection. This framework offers various options for customization.

// Java
MongoCollection<Document> collection =
mongoTemplate.getCollection('yourCollectionName');
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = collection
.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE)
.cursor();
while (cursor.hasNext()) {
ChangeStreamDocument<Document> changeStreamDocument = cursor.next();
// Code to write to any server or db or platform
}
// JS Implementation
const collection = db.getCollection('yourCollectionName');
const cursor = collection.watch({
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
});

while (cursor.hasNext()) {
const changeStreamDocument = cursor.next();
// Code to write to any server or db or platform
}

Leveraging MongoDB streams with an aggregate pipeline provides a lot of options, including appending custom fields based on existing ones and integrating aggregations during processing. For instance, you can implement filter conditions

// Java
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));

// JS
var pipeline = [
{
$match: {
$or: [
{ 'fullDocument.username': 'alice' },
{ 'operationType': { $in: ['delete'] } }
]
}
}
];

How can streams be resumed in case of failure or restart?
In case of failure or restart, MongoDB streams offer a method to resume using resume tokens. These tokens can be stored for persistence in Redis or MongoDB itself. Upon restart or failure, these tokens can be utilized to resume watching from the point where it was left off.

// Java
cursor = collection.watch()
.resumeAfter(BsonDocument.parse('resumeToken')) // Resume Token
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE)
.cursor();

Scaling of Custom Consumer

One major challenge in handling large-scale events is scaling the consumers. MongoDB streams don’t provide a built-in method to scale consumers dynamically or statically. Therefore, custom techniques are necessary.

  1. First, we implemented batch logic in the cursor, allowing us to process up to 1,000 documents per batch. This approach helped scale to a certain extent, but it wasn’t sufficient for our desired load.
  2. Next, we thought of implementing partitions on the ID field, partitioning done using the starting character. However, MongoDB IDs start with a timestamp, causing IDs generated at the same time to have the same starting character. To address this, we used the last character of the ID, which ranges from 1 to e.
  3. We have divided it into two partitions: one for IDs ending with a digit and another for IDs ending with a non-digit.
  4. Then, regex via the aggregate pipeline. Since regex operations can’t be performed directly on ObjectID data types, we converted the IDs to strings in the aggregate pipeline and then applied the regex.
// Regex
public enum Partition {
ONE("\\d$"), TWO("\\D$");
}

//Pipeline
private List<Bson> createPipeline(Partition partition) {
Bson addFieldsStage = Aggregates
.addFields(new Field("documentKey._ids",
new Document("$toString", "$documentKey._id")));
// $match stage to apply regex
Bson matchStage = Aggregates.match(Filters.regex("documentKey._ids",
Pattern.compile(partition.getPattern())));
// Combine the stages
List<Bson> pipeline = new ArrayList<Bson>();
pipeline.add(addFieldsStage);
pipeline.add(matchStage);

return pipeline;
}


MongoCollection<Document> collection
= mongoTemplate.getCollection(getCollectionName());
// redis service for resumetoken
String resumeToken = redisService
.getToken(String.join(StringConstants.HYPHEN,
'collection_name', Partition.ONE.name()));

log.info("Resume Token {} for partition {}", resumeToken, Partition.ONE.name());

ChangeStreamIterable<Document> changeStreamIterable = collection
// .watch()
.watch(createPipeline(partition))
.batchSize(BATCH_SIZE)
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);

if (Objects.nonNull(resumeToken)) {
changeStreamIterable.resumeAfter(BsonDocument.parse(resumeToken));
}

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor
= changeStreamIterable.cursor();

Each partition operates as a separate consumer, each with its own resume token. With custom consumers, we must handle the resume token logic, custom partition logic, and proper restart techniques, which can be an overhead.

Using Kafka

Our custom message consumers were bottlenecked by scaling limitations and managing partitions effectively. Luckily, we had a Kafka message broker already in our infrastructure. We redesigned the system to publish messages from the stream consumer to Kafka topics. There are two key benefits to this approach:

  1. Scalability: Since Kafka is a high-throughput message broker, the stream consumer can quickly put messages into Kafka. This allows us to have a larger number of consumer processes listening to Kafka. This is a significant advantage compared to our custom implementation, which struggled to scale effectively.
  2. Efficient Processing: With multiple consumers subscribing to Kafka topics, the data can be processed in parallel, improving overall processing speed.

Our research led us to the Mongo Kafka Connect library, this library eliminates the need for custom development and utilizes Kafka itself for storing resume tokens. Data partitioning is handled by Kafka itself, using MongoDB Object IDs as key identifiers. Kafka’s hashing mechanism distributes the data across multiple partitions.

If an additional performance boost is required, regular expressions can be used with multiple Mongo Kafka Connect instances. Running multiple instances on the same stream (similar to regex partitioning in the custom consumer approach) produces data for Kafka partitions. This enables downstream services to efficiently process messages by scaling consumers based on the number of partitions.

Mongo Kafka Connect Config file

{
"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only":"false",
"mongo.errors.log.enable":"true",
"tasks.max":"4",
"output.format.value":"json",
"batch.size":"0",
"change.stream.full.document":"updateLookup",
"output.format.key":"schema",
"pipeline":"[]",
"topic.prefix":"quickstart-events",
"database":"writeDb",
"output.json.formatter":"com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"poll.await.time.ms":"5000",
"name":"mongo-source",
"connection.uri":"mongodb+srv://dev:123@development.irmew.mongodb.net/db",
";":"transforms.RenameField.renames=fullDocument._t:fullDocument.time",
"collation":"",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"topic.suffix":"",
"poll.max.batch.size":"1000",
"output.schema.key":"{\"type\":\"record\",\"name\":\"keySchema\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}",
"transforms": "deduplicate",
"transforms.deduplicate.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.deduplicate.field": "documentKey._id",
"transforms.deduplicate.ttl.ms": "3000",
"publish.full.document.only.tombstone.on.delete": "true"
}
}

In this setup, we use output.schema.key to instruct Kafka Connect to use a specific field as the key identifier. Additionally, you can pass an aggregate pipeline via the configuration file to partition data based on the ID.

By configuring output.schema.key, Kafka Connect knows which field to use as the partition key, enabling efficient data distribution across partitions.

"output.schema.key":"{\"type\":\"record\",\"name\":\"keySchema\",
\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}"

Conclusion

Modern problems require modern solutions.

Change Data Capture (CDC) as a whole provides developers with efficient ways to consume and react to data changes in real time without impacting performance. MongoDB Change Streams offer a powerful method for handling change data natively within MongoDB.

For simpler applications with manageable scaling requirements and low RPS, custom consumers are a good choice as they eliminate the need for additional infrastructure like Kafka. However, for applications with high load, existing Kafka infrastructure, and multiple consumer types, using Kafka and Mongo Kafka Connect is a better option.

Overall, CDC technologies enable real-time data synchronization and drive responsive, event-driven architectures across various application ecosystems.

--

--