Connecting Apache Kafka to Azure CosmosDB — Part I

Hans-Peter Grahsl
7 min readOct 14, 2018

--

Setting the Scene

In recent years, there has been a tremendous adoption of Apache Kafka as a distributed streaming platform in enterprises across a multitude of different industries. Kafka often finds itself at the heart of event-driven architectures in which one of the primary concerns is to avoid countless point-to-point connections between an ever increasing number of data producing and consuming applications. These applications or (micro)services typically employ their own data stores in order to be able to operate on optimized, often materialized views and data structures within their bounded contexts. While there are different ways to exchange data across service boundaries and have it synchronized, one reasonable approach is to make use of change data capture (CDC). The general idea of CDC combined with concrete tools and technologies like Apache Kafka Connect as well as proper source and sink connector implementations allows to build a low-latency streaming data integration pipeline spanning multiple services with relative ease — that is by means of mere configuration to wire source and sink data stores with Kafka as the highly resilient and horizontally scalable backbone in between. But getting data into Kafka using CDC-enabled source connectors like the ones found in the Debezium project is only half the battle. In addition to that, it needs sink connectors to get your precious data out of Kafka and continuously feed it into the target system of your choice. Given an event-driven architecture composed of several services, it can be beneficial for selected ones to build upon a versatile data store which can handle both, simple event payloads as well as complex DDD aggregates equally well. One possibility to achieve this flexibility is Azure CosmosDB which is therefore in the focus of this blog post. It discusses how to configure and make use of Kafka Connect to write any kind of data residing in Kafka topics to a target system which is using Azure CosmosDB as its backing and operational data store.

Current Integration Choices for CosmosDB
The first place to go when looking for Kafka Connect implementations is Confluent Hub where you can browse, search and filter for source and sink connectors. Unfortunately, it doesn’t take long to realize that, at the time of writing, there is no such sink connector available for CosmosDB. Before Confluent Hub there was a different — nowadays legacy and most likely unmaintained — web page listing available connectors. Again no CosmosDB connector appears but luckily, there is an entry for DocumentDB, an early predecessor of what is nowadays know as CosmosDB.

Looking closer, it becomes obvious that this very connector is part of a larger, in general pretty impressive suite of connectors that are developed and maintained by Landoop. Reading through the connector’s documentation shows it’s relatively easy to configure thanks to the concise and expressive SQL-like dialect it supports. There are also plenty of features available that are working very similar to the feature-set offered by other RDBMS / SQL based technologies and products. The ease of use for this connector is definitely a big plus and it can be a good choice to start with.

However, in case you have to address advanced processing needs which go beyond things like: insert/upsert semantic, field projections, field functions, or in general anything that cannot be easily supported or described by the SQL-like dialect you may need alternatives. What is definitely lacking, yet often needed when building data integration pipelines are the following capabilities:

  • no possibility to have records deleted in the sink, e.g. following “Kafka conventions” such as null values in records originating from log-compacted topics
  • cannot deal with records resulting from log-based CDC source connectors because it has no understanding about different source operation types (CRUD) and their semantically varying payloads
  • developers cannot easily customize or add to the out-of-the-box features since the documentation does neither highlight where to start, nor discuss how to implement extensions in a straight-forward and guided way respectively.

Looking further, what other options are there?

Enter MongoDB API for Azure CosmosDB
Besides multi-model capabilities, CosmosDB offers different APIs which can be used as drop-in replacements for existing applications that were originally built on top of other data stores. For instance, there is native support for MongoDB on a wire-protocol level.

This means that CosmosDB can be used instead of MongoDB simply by changing the connection string for the driver that is used by the application. The promise in this regard is, that NO(!) code changes are needed in the application layer, so long as the features in use are available for and compatible with the officially supported driver version for CosmosDB’s MongoDB API.

That being said, it is possible to use a Kafka Connect sink connector written for MongoDB in order to build a data integration pipeline between any Kafka topics and CosmosDB collections in a low-latency streaming fashion. At first thought this may sound a bit strange, yet it can be a viable solution, especially due to the absence of native CosmosDB connectors or because available ones are lacking a certain feature set. What follows is a detailed step-by-step description how to integrate Apache Kafka with Azure CosmosDB using an open source MongoDB sink connector (disclaimer: written by the author of this blog post).

Let’s get this started!

Azure CosmosDB
Log into the Azure Portal to create a CosmosDB Service in an Azure region and with an account name of your choice and select MongoDB API as API option.

The deployment may take up to a few minutes so be patient. Afterwards go to your new resource and use the Data Explorer in order to create a new database (e.g. “demo”) and add a collection (e.g. “blogpost”) and use lowest settings for storage capacity and throughput, which is enough for a quick and small demo.

That’s it for now on Azure. Let’s switch to the Apache Kafka side.

Confluent Platform
While there are many alternatives to get Apache Kafka up and running the most convenient one to do it quickly and hassle-free on your local machine is using Confluent Platform (open source version suffices) which can be downloaded from here. Install Confluent Open Source Platform simply by extracting the archive into a folder of your choice e.g.
/usr/local/confluent-5.0.0

Install the MongoDB sink connector by downloading the release archive from Confluent Hub and extract it into the /share/java folder of your platform installation.

Launch the Confluent Platform by simply running bin/confluent start in your installation folder. This should successfully launch all Kafka related processes, namely zookeeper, kafka, schema-registry, kafka-rest, connect, ksql-server and may take a few moments before resulting in an [UP] status for each of them:

In order to have something to work with, a Kafka topic (e.g. “blogpost”) can be created for the sake of this demo using the kafka-topics command line utility like so:

Quickly check for all available connectors to see that the MongoDB sink connector was registered successfully which can be done easily using the corresponding REST endpoint of Kafka Connect using cURL for instance:
curl -X GET http://localhost:8083/connector-plugins

Given the availability of the MongoDB sink connector the next step is to configure the connector to use Azure CosmosDB as the target data store. A very simple configuration which is effectively just doing a 1:1 passthrough of Kafka records — data is JSON without schema — to CosmosDB documents with insert semantics looks as follows:

Make sure to put the correct connection string of your Azure CosmosDB Account into the mongodb.connection.uri property. Also add the database name (e.g. “demo”) which is missing when you just copy it from the Azure Portal. This JSON configuration can be posted to the Kafka Connect REST endpoint http://localhost:8083/connectors so that a new connector instance will be started.

Afterwards, start a kafka-console-producer to send some JSON data to the Kafka topic (e.g. “blogpost”). In order to post data to the topic on the command line you need to send each JSON record encoded in a single line, one after the other. The following shows 5 JSON records that are written to the topic:

These 5 records will be picked-up and processed by the MongoDB sink connector which is writing them into the configured CosmosDB collection. Finally you can directly inspect your CosmosDB documents in the Azure Portal to verify that the data has been stored successfully:

Conclusion and Outlook

This first blog post in a series showed how to build a very simple data integration pipeline between Kafka and CosmosDB. Thanks to Kafka Connect this could be done without a single line of code. Furthermore, CosmosDB’s openness and unique support for different APIs on a wire-protocol level, made it possible to leverage an existing sink connector which was originally written against MongoDB.

Further blog posts in this series will deal amongst other things with:

  • a detailed look how to configure more complex sink scenarios by customizing the sink connector’s behaviour for specific use-cases
  • discussing some limitations and possible solutions when using this approach for real-world use-cases
  • a configuration / setup which allows to fully replicate data changes originating from disparate data stores into CosmosDB

Until then, have fun building streaming data pipelines using Apache Kafka and Azure CosmosDB :)

--

--

Hans-Peter Grahsl

SW Engineer, Trainer, Consultant, Associate Lecturer & Speaker — compensating with sportive activities. Proud husband, lion-hearted dad + Nespresso aficionado.