MongoDB Connector with ksqlDB using Confluent Kafka

Raviteja Pasarakonda
2 min readSep 1, 2020

The current example is implemented with the latest release Confluent’s version 5.5.1. Int the latest release, KSQL is now integrated with Kafka Connect. One can now manage connectors using KSQL Queries.

In this blog, I am going to share the steps for setting up Confluent Kafka, MongoDB using Docker, and create MongoDB Sink Connector using ksqlDB.

Pre-requisites:

  1. Docker
  2. Confluent Kafka
  3. MongoDB

Installing Docker

Install docker from https://docs.docker.com/. I guess you know the step :-P

Installing Confluent Kafka using Docker

git clone https://github.com/confluentinc/cp-all-in-one
cd cp-all-in-one
git checkout 5.5.1-post
cd cp-all-in-one/docker-compose up -d

Reference: https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#ce-docker-quickstart

Installing MongoDB using Docker

docker pull mongodocker run -it -d -p 127.0.0.1:27017:27017 --name mongodb -d mongo

Installing MongoDB Connector

By default, MongoDB Connectors are not installed in Connect Service. Use the following commands for installing the connector. Once the connector is installed, restart the connect service. Not required to restart complete Confluent, connect service alone suffice.

confluent-hub install mongodb/kafka-connect-mongodb:1.2.0

Docker Configuration

The docker network has to be created to Confluent Kafka Connect and MongoDB containers.

docker network create my-network
docker network connect my-network mongodb
docker network connect my-network connect

Use the following command to know the MongoDB’s IP Address to which Kafka Connect needs to connect to store/retrieve data. Extract the IPv4Address which is under containers with the name “mongodb”.

docker network inspect my-network

Running MongoDB Sink Connector using ksqlDB

You can either use Confluent’s Control Center (CCC)or ksql CLI or Kafka REST to run the below query. In this case, I am using CCC which will be by default running at http://localhost:9021/. Don’t forget to change the MongoDBIPv4Address extracted in the above step in connection uri.

CREATE SINK CONNECTOR `mongodb-test-sink-connector` WITH (
"connector.class"='com.mongodb.kafka.connect.MongoSinkConnector',
"key.converter"='org.apache.kafka.connect.json.JsonConverter',
"value.converter"='org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable"='false',
"value.converter.schemas.enable"='false',
"tasks.max"='1',
"connection.uri"='mongodb://MongoDBIPv4Address:27017/admin?readPreference=primary&appname=ksqldbConnect&ssl=false',
"database"='local',
"collection"='mongodb-connect',
"topics"='test.topic'
);

Start sending JSON messages to Kafka topic “test.topic”. You should be able to the events being store in MongoDB.

Feel free to comment for any queries :)

--

--