Real-Time Data Streaming with MongoDB Change Streams and Kafka Connector

Azmi Ahmad
5 min readMar 6, 2023

--

MongoDB Change Streams and Kafka

In today’s fast-paced world, real-time data streaming has become an essential requirement for many businesses. Real-time data streaming allows businesses to react quickly to changing conditions, enabling them to make more informed decisions and provide better customer experiences.

MongoDB, a popular NoSQL database, offers a powerful feature called change streams that allows developers to track changes to their data in real-time. Change streams allow developers to monitor changes to their data at a fine-grained level, enabling real-time data synchronization with other systems or applications.

Kafka, on the other hand, is a distributed streaming platform that allows developers to build real-time data pipelines and applications. It provides a reliable and scalable way to stream data between systems or applications, enabling developers to process data in real-time.

In this story, we’ll explore how to use MongoDB change streams with the Kafka Connector to build real-time data pipelines and applications.

Overview of MongoDB Change Streams

MongoDB change streams allow developers to track changes to their data at a fine-grained level. Change streams are created on a collection and can be configured to filter events based on various criteria, such as document fields, operations, or update types.

When a change is made to the collection, MongoDB sends a change event to the change stream. The change event contains information about the changed document, such as the document ID, operation type, and updated fields. The change event can be consumed by a listener or a pipeline, enabling real-time data synchronization with other systems or applications.

To demonstrate how to use MongoDB change streams, let’s consider a scenario where we have a MongoDB database that stores user information, and we want to track changes to the user information in real-time. We’ll create a change stream on the user collection and listen for changes using a Java application.

Step 1: Setting up MongoDB

To get started, we’ll need to set up a MongoDB database with a user collection. We’ll use the following schema for our user collection:

{
"_id": ObjectId("616250d7f429312c9816d7e6"),
"name": "John Doe",
"email": "john.doe@example.com",
"status": "active"
}

We’ll assume that we have a MongoDB database running on the localhost with the name “mydatabase” and a user collection named “user”.

Step 2: Creating a Change Stream

Next, we’ll create a change stream on the user collection using the MongoDB Java driver. We’ll configure the change stream to only listen for changes to users with an “active” status.

import com.mongodb.CursorType;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.*;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;

import java.util.ArrayList;
import java.util.List;

public class UserChangeStream {

public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoDatabase database = mongoClient.getDatabase("mydatabase");
MongoCollection<Document> collection = database.getCollection("user");

List<Bson> pipeline = new ArrayList<>();
Bson match = Aggregates.match(Filters.eq("status", "active"));
pipeline.add(match);

MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch(pipeline)
.cursorType(CursorType.TAILABLE_AWAIT)
.iterator();

while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
System.out.println(change.getFullDocument().toJson());
}
}
}

In the code above, we create a MongoClient and connect it to the MongoDB database. We then get a reference to the “user” collection and create a pipeline that filters documents based on the “status” field.

We then create a change stream using the watch() method and pass it to our pipeline. We configure the cursor to use the TAILABLE_AWAIT cursor type, which keeps the cursor open and waits for new data to arrive.

Finally, we loop through the cursor and print out the changed document.

Step 3: Setting up Kafka

Now that we have a change stream set up, we can integrate it with Kafka using the Kafka Connector. The Kafka Connector is a tool that enables developers to build real-time data pipelines between systems or applications.

To get started, we’ll need to set up Kafka and the Kafka Connector. We’ll assume that we have Kafka and the Kafka Connector running on the localhost.

Step 4: Configuring the Kafka Connector

Now that we have a change stream that listens for changes to the user collection, we can use the Kafka Connector to stream the changes to Kafka. The Kafka Connector is a plugin that allows Kafka to read data from and write data to external systems, such as MongoDB.

To configure the Kafka Connector, we’ll need to create a configuration file that specifies the MongoDB server and the Kafka server.

name=mongodb-connector
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
topics=user
connection.uri=mongodb://localhost:27017
database=mydatabase
collection=user
change.stream.full.document=updateLookup
publish.full.document.only=true

In the configuration file above, we specify the name of the connector, the connector class, the maximum number of tasks, and the topic we want to publish to.

We then specify the MongoDB server URI, database, and collection we want to monitor for changes. We also set the change.stream.full.document option to “updateLookup” to ensure that we get the full document for updates.

Finally, we set the publish.full.document.only option to “true” to ensure that only the full document is published to Kafka.

Step 5: Running the Kafka Connector

Once we have our configuration file, we can start the Kafka Connector using the following command:

bin/connect-standalone.sh config/connect-standalone.properties 
config/mongo-source.properties

In the command above, we specify the path to the connect-standalone.properties file and the path to our mongo-source.properties file.

Step 6: Consuming the Kafka Topic

Finally, we can consume the Kafka topic using a Kafka consumer. We’ll use the following Java code to consume the Kafka topic and print out the full document for each change event:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

private final static String TOPIC_NAME = "mytopic";

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
consumer.commitSync();
}
}
}

In this example, we create a Kafka consumer and subscribe to the Kafka topic we configured in Step 4. We then loop through the consumer and print out the full document for each change event.

Conclusion

In this story, we explored how to use change streams in MongoDB to monitor database changes in real-time. We also saw how to integrate MongoDB change streams with Kafka using the Kafka Connector, which enables developers to build real-time data pipelines between systems or applications.

By leveraging change streams and Kafka, developers can build robust and scalable applications that can react to data changes in real-time. This can help businesses make faster and more informed decisions, as well as improve customer experiences by delivering real-time updates and notifications.

If you’re interested in learning more about MongoDB, Kafka, or the Kafka Connector, check out the official documentation and tutorials on their respective websites. Happy coding!

--

--

Azmi Ahmad

Technology Enthusiast | Solution Architect | Entrepreneur | Co-founder at Plavaga