Kafka — Mongo Source Connector: Step By Step Walkthrough

Harshit Mishra
4 min readJan 6, 2020

--

This is a step by step guide as to how I connected MongoDB to Kafka using Kafka connect(Confluent) on my local setup.
This guide uses a dockerized approach, for non-dockerized approach you can read the following blogs here and here. These blogs helped me to setup confluent in no time. Also, they use debezium as a source connector which is also an open-source connector available on confluent.
Before I start, listing out a few key variables used in this setup.

Prerequisites:
1)Zookeeper
2)Kafka
3)Schema registry
4)Kafka connect
5)MongoDB
6)Docker

MongoDB Setup:

On the confluent website/Mongo website(official) they have mentioned specifically to use a mongo-DB replica.
As Kafka connect reads the oplog to fetch data and the standalone server doesn't have an oplog so we need to create replica instances for connecting Kafka to mongo.
Now with the dockerized approach, you can refer to my docker-compose.yml how we can add replicas to our mongo configuration.
One easy way is to run two different mongo servers on port: 27017(default port for mongo), 27018 for replica set. Initiate and add the primary secondary from the mongo client.There are already plenty of resources online for configuring replica set. Try inserting or updating a record it will reflect.
One roadblock that can occur is when you’re trying to read/verify the data it can show error:
Error: error: {

“operationTime” : Timestamp(1578052065, 1),

“ok” : 0,

“errmsg” : “not master and slaveOk=false”,

“code” : 13435,

“codeName” : “NotMasterNoSlaveOk”,

“$clusterTime” : {

“clusterTime” : Timestamp(1578052108, 1),

“signature” : {

“hash” : BinData(0,”AAAAAAAAAAAAAAAAAAAAAAAAAAA=”),

“keyId” : NumberLong(0)}}}

Use rs.slaveOk() command which will trigger the slaveOk to true and you can now verify and read data from your mongo client.
Command to create replica set:
docker-compose exec mongo1 /usr/bin/mongo — eval ‘’’if (rs.status()[“ok”] == 0) {rsconf = {_id : “rs0”,members: [{ _id : 0, host : “mongo1:27017”, priority: 1.0 },{ _id : 1, host : “mongo2:27017”, priority: 0.5 }]};rs.initiate(rsconf);} rs.conf();’’’

Kafka-connect:(Setting up Confluent)

Once our database is ready to connect we will setup confluent. You can download the connector plugin from here using the confluent hub. Or you can download the jar file from here. When downloading from the later download the all.jar with all the dependencies needed to setup confluent.

Configuration: Below is my docker-compose file for creating containers.

Docker-compose.yml

version: “3.6”

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.0
hostname: zookeeper
container_name: zookeeper
ports:
— “2181:2181”
networks:
— localnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
— ./zookeeper/data:/data
— ./zookeeper/datalog:/datalog

kafka:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka
container_name: kafka
depends_on:
— zookeeper
ports:
— “29092:29092”
— “9092:9092”
networks:
— localnet
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: “zookeeper:2181”
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
— ./kafka/data:/var/lib/kafka/data

schema-registry:
image: confluentinc/cp-schema-registry:5.3.0
hostname: schema-registry
container_name: schema-registry
networks:
— localnet
ports:
— “8081:8081”
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_HOST_NAME: schema-registry
depends_on:
— zookeeper
— kafka

kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
hostname: kafka-connect
container_name: kafka-connect
networks:
— localnet
ports:
— “8083:8083”
environment:
CONNECT_BOOTSTRAP_SERVERS: “kafka:9092”
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: “org.apache.kafka.connect.json.JsonConverter”
CONNECT_INTERNAL_VALUE_CONVERTER: “org.apache.kafka.connect.json.JsonConverter”
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: “1”
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: “1”
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: “1”
CONNECT_PLUGIN_PATH: ‘/usr/share/java,/etc/kafka-connect/jars’
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
— /tmp/omniscient/jars:/etc/kafka-connect/jars
depends_on:
— zookeeper
— kafka
— schema-registry

# MongoDB Replica Set
mongo1:
image: “mongo:4.0-xenial”
container_name: mongo1
command: — replSet rs0 — smallfiles — oplogSize 128
volumes:
— rs1:/data/db
networks:
— localnet
ports:
— “27017:27017”
restart: always
mongo2:
image: “mongo:4.0-xenial”
container_name: mongo2
command: — replSet rs0 — smallfiles — oplogSize 128
volumes:
— rs2:/data/db
networks:
— localnet
ports:
— “27018:27017”
restart: always

networks:
localnet:
attachable: true

volumes:
rs1:
rs2:

Run this doker-compose file using command docker-compose up. You will be able to see multiple containers being spawned using command docker ps.

Running container after executing docker-compose up

Registering your connector:

After all the containers are up and running you have to register your connector. Before doing that check if the plugin you need is available or not.
Command: curl localhost:8083/connector-plugins

this will list all the plugins available. In our example, we are using the Kafka source connector.

As you can see Mongo source connector is available, then its time to register our connector on the endpoint.
curl -X POST -H “Content-Type: application/json” — data ‘{“name”: “mongo-source”,”config”: {“tasks.max”:”1",”connector.class”:”com.mongodb.kafka.connect.MongoSourceConnector”,”connection.uri”:”mongodb://mongo1:27017,mongo2:27017",”topic.prefix”:”identity.identity.users”,”database”:”identity”,”collection”:”users”}}’ http://localhost:8083/connectors -w “\n”

Once registered all we need is to check if our kafka stream is getting the data.
To do so first we need is a topic :
kafka-topics — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic topicname

then run the consumer to fetch data
kafka-console-consumer — bootstrap-server localhost:9092 — topic yourtopicname

Checking Status of your Connector

You can also check the status of the registered connector by
Command: curl localhost:8083/connectors/<connector-name>/status

UnRegister/Delete connector
Command: curl -X DELETE http://localhost:8083/connectors/<connector-name>

Here you are all set to with mongo-kafka source connector.For any queries please reach out to me on harshrider1@gmail.com.

Cheers.

--

--