Connecting Apache Kafka to Azure CosmosDB — Part II

Hans-Peter Grahsl
8 min readNov 16, 2018

--

Part one gave a quick introduction into building a very simple, near real-time data integration pipeline between any JSON data that is written to Apache Kafka topics and Azure CosmosDB.

The 2nd part of this blog post series will dig a little deeper in order to explore how to setup a data pipeline which allows to fully replicate all changes originating from various disparate data stores into CosmosDB.

Change Data Capture 101

Change data capture — CDC for short — is becoming an ever more popular architectural approach when it comes to building data integration pipelines in (micro)service-oriented environments. Based on Apache Kafka Connect and pre-built Connectors it is straightforward to implement powerful CDC scenarios between various kinds of data sources and data sinks with literally no coding — that is merely by means of proper configuration.

While there are plenty of different CDC solutions in the wild, they basically come in two broad flavours:

Query-based CDC
In query-based, sometimes also called poll-based CDC, the mechanism to detect changes in data stores works by periodically querying for any data changes which were happening due to insertions of new or modifications of existing data records. Conceptually these solutions can be implemented with relative ease, simply by polling e.g. database tables over JDBC in regular intervals.

Log-based CDC
In contrast to query-based CDC, log-based CDC solutions are more complex and difficult to implement. They do their work by reacting to any changes which are happening in the internal changelog structures of a data store, for instance, the Binary Log in MySQL or the Write-ahead Log in PostgreSQL. In general this is a much more versatile approach.

While query-based CDC has its place, log-based CDC solutions offer several benefits and are typically to be favoured, especially in the light of building streaming pipelines between any two data endpoints:

  • never miss intermediate changes: because the change detection mechanism is based on reading the internal changelog of data stores, all changes are guaranteed to be captured.
  • very low latencies with reduced load impact: reacting to the changelog of data stores can be done with much less overhead than running frequent checks from the query layer with short poll intervals.
  • works with any existing data model out-of-the-box: in contrast to query-based solutions which either need ever increasing record IDs (e.g. auto-incremented fields) and/or timestamps respectively, log-based CDC doesn’t prescribe any particular data model characteristics at all in order to be able to reliably detect changes of records.
  • delete operations can be detected: log-based CDC can replicate not only INSERTs or UPDATEs but also DELETE operations which are happening in the underlying data store, the latter being probably one of the biggest functional advantages over query-based CDC.
  • react to structural changes in data stores: there is a natural evolution to the structure of data which can also be captured by log-based CDC tools in order to inform e.g. about any schema changes. In addition to that, it is even possible to extract meta-data about data changes. Both of which cannot be easily done with query-based CDC, if at all.

This blog post from the Debezium project offers a more elaborate discussion on these aspects which were only briefly summarized above:

Let’s replicate data store changes to CosmosDB!

The rest of this blog post describes a step by step solution how to setup a streaming data integration pipeline using:

  • a MySQL Database as the source data store
  • the Debezium MySQL Source Connector
  • Apache Kafka and Kafka Connect as the streaming CDC backbone
  • the MongoDB Sink Connector
  • Azure CosmosDB with MongoDB API as the sink data store

Cloud Setup in the Azure Portal

Log into the Azure Portal to create a CosmosDB Service in an Azure region and with an account name of your choice and select MongoDB API as API setting.

The deployment may take up to a few minutes so be patient. Afterwards go to your new resource and use the Data Explorer in order to create a new database (e.g. “mysqlcdc”). Unless you want to specify custom collection settings, there is no need to manually pre-create any collections, since collections will be created on-demand during the CDC replication by means of the Apache Kafka sink connector. The screenshot below shows the collection settings that were used per default for any collection that will be created automatically:

Local Apache Kafka, Kafka Connect & MySQL Setup using Docker

In order to aid the setup procedure of a MySQL instance with a demo database, Apache Kafka, Kafka Connect and the registration of the needed source and sink connectors respectively, turn-key ready docker images provided by Debezium are used together with a docker compose file.

To start with, download the MongoDB sink connector release archive from Confluent Hub and extract it into a folder of your choice (e.g. /home/<username>/hpgrahsl-kafka-connect-mongodb-1.2.0/).

The docker-compose file in the Gist below is based on the Debezium MySQL tutorial. The main difference for this demo is that it additionally mounts a volume so that the MongoDB sink connector, which will replicate the data to CosmosDB, is also available to Kafka Connect running in the docker container. Look for the volumes entry and change the placeholder <…> with the absolute pathname so that it points to the MongoDB sink connector.

Let’s switch to the terminal and spin-up the dockerized environment by running:

docker-compose -f dbz-kafka-connect-mysql.yaml up

After a few moments all processes are up and running and it’s time to check if both Kafka Connectors — source connector for MySQL and sink connector for MongoDB — are available and ready to be used. This can be done easily by running

curl -X GET http://localhost:8083/connector-plugins

which returns a JSON array listing all available connectors. For this example it should contain amongst others, the MongoDBSinkConnector as well as the MySqlConnector and look as follows:

The next step is to configure and start a Mysql Source Connector instance which will read all data from the tables in a pre-defined demo database. Kafka Connect’s REST API can be used to do that with ease:

All tables are initially written to corresponding Apache Kafka topics before the source connector starts to process the MySQL binary log to subsequently write any changes to the Apache Kafka topics in question.

The final step in order to achieve continuous replication of all this table data into Azure CosmosDB collections is to configure and run the MongoDB Sink Connector, again by sending it’s configuration using the REST API of Kafka Connect. Make sure to put the correct connection string of your Azure CosmosDB Account into the mongodb.connection.uri property. Also add the database name (e.g. mysqlcdc) which is missing when you just copy it from the Azure Portal.

Verify CDC-based replication results in Azure Portal

Using the Data Explorer of Azure CosmosDB shows that within the mysqlcdc database there are now the following 5 collections which were replicated via from MySQL (source connector) to CosmosDB (sink connector): addresses, customers, products, productsonhand, orders.

Clicking on e.g. the customers collection and then launching a New Shell (can be found in the tab menu at the top bar) allows to write any queries similar to a MongoDB shell. The screenshot below shows a find query to list all 4 customer documents in the CosmosDB collection:

Making changes in the MySQL source database

Now in order to see the continuously running low-latency data replication in action, let’s make a simple data change in our MySQL source database running in the Docker container. This can be done either by means of using MySQL CLI to access the database running within the docker container or for convenience reasons with the MySQL Workbench, which is shown below.

The connection settings for MySQL in Docker are as follows:

  • hostname: localhost
  • port: 3306
  • username: mysqlusr
  • password: mysqlpw

After configuring the connection in MySQL Workbench the inventory schema database can be accessed. A simple SELECT * FROM inventory.customers shows all 4 entries. In the result grid below the query editor it is possible to make changes to existing entries or add a new customer record like depicted in the screenshot below. It’s important NOT to forget to click Apply so that UPDATEs or INSERTs are actually written :)

Verify CDC-based replication results in Azure Portal

Back in the Azure Portal CosmosDB Shell we can run the query db.customers.find({"_id":{"id":1005}}) to retrieve the newly inserted customer record:

Summary and Outlook

This blog post showed how to build a simple data replication pipeline between MySQL and CosmosDB. Based on log-based CDC — thx to Debezium’s MySQL Source Connector — a MongoDB Sink Connector from the community and Apache Kafka / Kafka Connect, any data changes occurring in MySQL tables are replicated into CosmosDB collections with very low-latencies. The same approach allows to use other data source as the replication source. Besides MySQL, Debezium currently also supports: MongoDB, PostgreSQL, Oracle and SQLServer the latter two of which are incubator releases.

The next blog post in this series will provide further insights such as:

  • a discussion about some limitations and possible solutions when using this approach for real-world use-cases
  • a detailed look how to configure more complex sink scenarios by customizing the sink connector’s behaviour for specific use-cases

STAY TUNED!

Until then, have fun building near real-time replication scenarios between various source data stores and Azure CosmosDB :)

--

--

Hans-Peter Grahsl

SW Engineer, Trainer, Consultant, Associate Lecturer & Speaker — compensating with sportive activities. Proud husband, lion-hearted dad + Nespresso aficionado.