Practical DCP

Burak Dursunlar
Trendyol Tech
Published in
7 min readJan 17, 2023

Database Change Protocol (DCP) is one of the fundamental building blocks of Couchbase. It not only helps exchange data between different Couchbase services like Query or Indexing, but it is also behind some of the cool features like XCDR and connectors.

Beaver, Photo by Svetozar Cenisev on Unsplash

You might think that DCP is only of interest to those who want to learn more about database internals. The purpose of this post is to demonstrate the opposite. I think, for anyone who works with data and uses Couchbase, it is worthwhile to learn how to use DCP to solve real-world problems.

In this post, we’ll see how DCP works and how to use the API. Then we’ll go over how DCP can help you solve some of the problems that every developer might run into in production.

How DCP works anyway?

It is critical to understand how a technology operates behind the scenes to determine if it is appropriate to our use case.

To facilitate sharding and replication, Couchbase has a notion called vBuckets. There are 1024 vBuckets for each bucket¹. Cluster manager is responsible for mapping those vBuckets to a physical node.

When there is a document being inserted, a hash of the key is calculated. Because the hash range is 0 to 1023, the document is effectively assigned to a vBucket. The node in charge of the vBucket actually stores the data. So it’s fair to say that each physical node has vBuckets, and each vBucket has documents.

While the above process takes place for every operation, another interesting stuff happens simultaneously:

Each vBucket has a dedicated stream containing document mutations. Whenever a mutation (or deletion) request comes from a client, corresponding vBucket sends a message to that stream.

This is where DCP enters the picture. DCP is nothing but a set of rules defining the structure of those mutation events.

Each event message contains event type (can be mutation or deletion), document key, document value if event type is mutation rather than deletion. In addition, each event is assigned a sequence number. The sequence number grows as events are created².

DCP is not a history book where you can see all mutation history of a document, unlike typical message queues, events from the same document are deduplicated (only taken the most recent event).

Assume you have a document which gets 5 different mutations. It is not always the case that you will see 5 different events for given document. The only way to see multiple events is having a DCP listener whenever event occurs (so we can store it somewhere else).

Deduplication also happens immediately if many mutations to any individual document happen rapidly³. Having an active DCP listener may not capture intermediate states but a final state of a document. Again, not suitable, If some documents are mutated so fast and you want to capture all mutations.

Because each DCP stream has its own incrementor, It does not give a cluster-wide ordering of events. Only events within a vBucket are ordered. If you want to get a sorted view of mutation events, DCP is not a solution for you. You get mutations for a specific document in an ordered way though, since a document always stays in the same vBucket.

DCP Client

The way to interact with DCP is to use DCP Client. At the time of writing, there are clients written in Java and Go.

The API is quite straightforward. I am going to show you Java example.

First, connect to a bucket:

Client dcpClient = Client.builder()
.credentials(username, password)
.seedNodes(address)
.bucket(bucketName)
.flowControl(BUFFER_ACK_SIZE_IN_BYTES)
.build();

Register a listener

dcpClient.listener(new DatabaseChangeListener() {
@Override
public void onFailure(StreamFailure streamFailure) {
System.err.println(streamFailure.getCause());
}

// Callback when there is a mutation event
@Override
public void onMutation(Mutation mutation) {

// You can get key and value of mutated document like below
String docKey = mutation.getKey();
byte[] content = mutation.getContent();

// If your bucket contains multiple collections
// you can get collection info
CollectionInfo collection = mutation.getCollection();

// It is also possible to get vBucket id that event belongs to
var vBucketId = mutation.getVbucket();

// Do some stuff...

}

// Callback when there is a deletion event
@Override
public void onDeletion(Deletion deletion) {
var docKey = deletion.getKey();
var vBucketId = deletion.getVbucket();

// Do some stuff...

}
}, FlowControlMode.AUTOMATIC);

Finally, start listening events:

client.connect().block();

// If you want to listen events that occur after the initialization of the listener
// pass StreamFrom.NOW as a first argument to the below function
client.initializeState(StreamFrom.BEGINNING, StreamTo.INFINITY).block();

client.startStreaming().block();

// add blocking code there
// because callback methods are invoked in different thread

// close resources
client.disconnect().block();

Use cases

After all that theory, It is time to discuss some use cases.

Replicating to different database

Sometimes It makes sense to replicate data in Couchbase to different kinds of platforms such as relational and time series databases, data warehouses, or others.

There are some connectors available (they are implemented via DCP Client btw).

For instance, as Recommendation Team in Trendyol, we use Couchbase Elasticsearch Connector to recommend sponsored products on product detail pages.

If you do not find the platform you’re looking for, you can implement your own connector using DCP Client.

What connectors cannot do out of the box is let you to determine documents to replicate or not to replicate.

Consider the following scenario: we have a bucket that contains users. And we wish to replicate those users to two different Elasticsearch indexes, taking their age into account:

  • First index for users older than 30 years.
  • Second index for users under the age of 30.

Even though there is an Elasticsearch Connector for Couchbase, It is not suitable for the above requirement. But with little effort, It is possible with DCP Client⁴.

Trend Analysis

Businesses require historical data for analytics, reporting, and forecasting. If you have data residing in Couchbase and that data should be analyzed in a historical manner, you should consider using DCP Client or connectors.

Let’s assume that as an e-commerce company, we store all product details in Couchbase. Like below:

If we want to capture the product’s entire price history, we can use DCP Client and save all state changes to a relational database like PostgreSQL.

Ad hoc migrations

Migrations ☠️, should we say “pain in the neck” instead? Sometimes you have to alter data type of a field from integer to string or convert timestamps from seconds to milliseconds. You have to do migration then.

Safer approach would be to migrate transformed data into a new bucket. However, there are situations when you cannot build a new bucket and must use the bucket that requires migration.

In those cases, during migration, be sure that:

  • Only the migration job causes your bucket to change. Otherwise, once the migration is complete, your bucket will be in an inconsistent state.
  • Since the job listens and mutates the same individual bucket, you may run into circular event problem. To avoid this, do not listen to the events that occurred after the migration started:
// pass the second argument as StreamTo.NOW instead of StreamTo.INFINITY
dcpClient.initializeState(StreamFrom.BEGINNING, StreamTo.NOW)

Data integrity checks

Being able to store documents without any structure in NoSQL databases is a double-edged sword.

Flexibility comes with a cost. It is much easier to corrupt documents If there was not enough validations in the service.

Here is another issue, documents follow the structure but it has wrong values. One example would be an age field with negative or too big values (data type of the field is correct). Unfortunately, those cases are even harder to detect because the service/API does not fail most of the time.

Data integrity checks are an important way to detect data problems earlier. Listening to a DCP stream is one viable way to traverse all data in a bucket.

Final words

Hopefully, you now have a new addition to your developer arsenal 😃.

If you have not already, I recommend checking other cool articles related to DCP:

Hope you found that article useful! As always, all feedbacks and comments are welcome. Bye 👋

Footnotes

  1. On macOS, the number of vBuckets is 64. See: vBuckets | Couchbase Docs
  2. According to the Couchbase documentation, It is not guaranteed that the number grows incrementally, hence some numbers might be skipped.
  3. Read more about deduplication.
  4. Another option would be to write events to Kafka using Couchbase Kafka Connector and then implement a consumer which does filtering and finally writes to Elasticsearch.

References

https://review.couchbase.org/plugins/gitiles/kv_engine/+/fbdf0d1e1309813d52a06871d8257c4cd7ac5b68/docs/dcp/documentation/terminology.md

--

--