Optimizing Read Access to Sharded MongoDB Collections utilizing Apache Kafka Connect

Introduction

Over the last couple of years, MongoDB established itself as one of the most popular databases — ranking #1 for document stores and #5 in general. For use cases which can benefit from the document-oriented data model and flexible schema it is actually a very good fit, especially since MongoDB was designed from the ground the up with high availability and horizontal scalability in mind, both of which are features that are completely transparent from a developer’s perspective.

High availability is achieved by running so-called Replica Sets consisting of a single primary and multiple secondary nodes. Horizontal scalability is offered by means of running a sharded cluster composed of multiple such Replica Sets, each of which is responsible to handle read and write requests for parts of the full data. The underlying data distribution mechanism works based on shard keys which are specified for collections in order to define how the documents that are to be stored are partitioned across multiple Replica Sets.


No Shard Key is Perfect

With fully managed MongoDB-as-a-Service cloud offerings like Atlas it is literally a no brainer to setup and operate a sharded MongoDB cluster. However, no datastore on this planet can either help us with the way we are shaping our data in the first place, nor can they magically suggest perfect ways to partition our data across multiple shards. Without doubt, defining a proper shard key for MongoDB collections is one of the most challenging and extremely critical decisions to make when designing distributed data models. At the same time, defining a certain shard key is essentially an irreversible action since shard keys cannot simply be changed at will some point in time later. Despite recommendations and best-practices, committing to a shard key always involves trade-offs — like with so many other choices in our industry. We have to accept the fact that we cannot find “a shard key to rule them all”, one which will support current and potentially future query patterns equally well. From a performance perspective, a rough distinction of queries against sharded MongoDB collections can be made as follows:

  • VERY EFFICIENT: reading documents pinned to a single partition i.e. based on their shard key only targets a single specific shard
  • NOT SO EFFICIENT: reading documents across partitions e.g. based on a shard key range, typically results in more than one shard to be queried
  • HIGHLY INEFFICIENT: reading documents irrespective of their shard keys causes so-called scatter gather queries which inevitably involve every shard in the cluster
Given the above, even if we manage to design our data model in order to get a large majority of our query patterns to be very efficiently supported by the chosen shard key, it might well happen that we get unsatisfactory performance for some others. That is, for instance, with queries we didn’t even think about when designing our sharded data model in the first place.

Thus we can either accept the fact that some queries simply won’t result in a reasonable performance or try to come up with a different solution.


Streaming CDC replication of sharded MongoDB collections

One viable approach to optimize a data model so that read access targets single shards, is to accept some data duplication and embrace eventual consistency. Let’s see how this can be easily achieved using Apache Kafka and Debezium, the “secret sauce” for streaming change data capture. The rest of this blog post discusses a fully working example, how to replicate a sharded MongoDB collection into another sharded MongoDB collection. Obviously both collections will eventually hold the same data, yet they are configured to use different shard keys. Such a setup allows to efficiently support further queries, which originally resulted in highly inefficient scatter gather behaviour. A simple scenario which shows how to practically apply this approach is explained below step-by-step.

Minimum Viable Container Environment

To begin with we need a simple sharded MongoDB cluster. What we are going to setup is by no means a production-ready configuration, yet suffices just fine for local development purposes. The cluster for this demo purpose is made of 4 MongoDB related nodes only:

It’s important to understand that any reasonable deployment would consist of at least: a 3 node replica set for configuration servers, a 3 node replica set for each of the shards and typically more than just a single route node.

To keep it simple, also for our Apache Kafka environment we are using a minimalistic setup composed of:

  • 1 Zookeeper node
  • 1 Kafka broker
  • 1 Kafka Connect node

This GitHub repository contains a ready-made docker-compose file including all the necessary setup scripts to bootstrap the complete environment described above. Just clone the repository, then:

  1. run docker-compose up to bring up the environment and launch all docker containers
  2. wait a few moments before you run sh bootstrap-mdb.sh in order to setup the MongoDB cluster and initialize the data model

Sample Data Model

A very nice cross-platform UI client for MongoDB is Compass. Connecting to the MongoDB router node (mongos) running in the Docker container can be done using localhost:27017 and shows that there is a blogpost database containing two collections named demo_collA and demo_collB respectively.

Both collections are sharded, yet they are using different shard keys in order to be able to finally support shard-local queries for different query patterns:

  • demo_collA: holds 50 documents, sharded based on the field sector
  • demo_collB: initially empty, sharded based on the field address.city

Thus, you shouldn’t worry since initially only demo_collA holds data, which is fine for now. The 50 sample documents found in demo_collA are very simple in structure and look similar to the following:

Enter Apache Kafka Connectors

The next step is to setup Debezium’s Source Connector for MongoDB in order to do an initial snapshot of the data contained in demo_collA. Right afterwards, the connector starts to continuously stream any MongoDB oplog changes happening inside the shards into corresponding Apache Kafka topics. All we need for this is a simple configuration which will be sent using a POST request against the Kafka Connect REST API. The connect node running in docker exposes the port 8083 and can be reached via localhost:

Current data found in demo_collA as well as all further changes resulting from any CRUD operations to it are replicated into an Apache Kafka topic named example.blogpost.demo_collA. The example prefix is only a logical grouping which can be set for Debezium’s source connector.

In order to bring all this data residing in the Apache Kafka topic into our MongoDB target collection demo_collB, we deploy a MongoDB community sink connector. Like for the source connector, this can be easily done without any code by means of the following configuration:

Shortly after the setup and start of both, the source and sink connectors we can inspect the expected result, which is that demo_collB holds the exact same data as found in demo_collA. The streaming change data capture pipeline between both these collections will make sure to keep the data in sync should any future changes occur in demo_collA.

By means of the different shard keys, this collection can now efficiently support queries which are filtering for a specific {"address.city":"..."} field. Only a single shard is targeted thanks to the shard key support.

The visual explain plan shows the efficiency of this single shard query based on an index scan:

If for comparison, we were to run a query which filters for {"address.city":"NEW YORK"} against the original demo_collA, it will result in an extremely inefficient scatter gather query across all shards — i.e. only 2 nodes in our simple scenario but in general all N shards.

The visual explain plan shows the inefficiency of this scatter gather query:

Each of the queries itself would in this particular case even need to perform a full shard-local collection scan. The latter could of course be omitted by defining an additional index for the address.city on demo_collA. Nevertheless, the scatter gather query across shards and the merging of “distributed result sets” would still need to happen and lead to considerably slower query times.


Summary & Conclusion

This blog posted showed how to optimize read access against sharded MongoDB collections. The suggested approach is utilizing Apache Kafka and Kafka Connect with proper source and sink connectors in order to setup a CDC-based, streaming data pipeline which replicates any sharded source collection into sharded sink collections. While both, source and sink collections will hold the same data at any given point in time, the sink collections can be configured to efficiently support arbitrary query patterns and filters by leveraging different shard key definitions. The two drawbacks are that we have to live with the fact that data is duplicated and that changes occurring in the source collections are not instantly reflected, but instead continuously propagated to the sink collections in near-realtime.

Even if such a solution might look unfamiliar and unconventional at first sight, it is very powerful and flexible thanks to the following characteristics:

  • non-intrusive: It is neither necessary to make any changes to an existing application nor touch the original data model.
  • decoupled & isolated: The data replication process itself happens detached from the actual MongoDB cluster which means there is little to no impact in terms of additional load. For simplicity reasons, the demo discussed in this blog post replicated into the same MongoDB cluster. However, this can be easily changed so that sharded sink collections 
    will be managed by a completely separated cluster.
  • fault-tolerant & scalable: By leveraging Apache Kafka and Apache Kafka Connect, the streaming CDC-based data replication of sharded MongoDB collections can easily adapt to and scale with any write load and/or data volume of the MongoDB cluster.
  • extensible: While this blog post focused on a very simple scenario, the exact same approach can be taken to either support multiple shard keys for one and the same source collection or include other sharded collections — just by utilizing Apache Kafka Connect source and sink connectors based on configuration files only.

Happy read-optimized sharding! ;-)