Navigating the Challenges of Couchbase DCP Events: Our Journey at GlobalPlatforms

Yusuf Saglam
Trendyol Tech
Published in
6 min readMay 8, 2024

As a core component of the GlobalPlatforms (GP) team’s infrastructure, Couchbase and its connectors play a pivotal role in our operations, tailored with customizations to suit our needs. In this article, I delve into our experiences and strategies in managing this ecosystem at scale.

At first, I need to mention why we use connectors. In our typical scenario, we are updating our document and notifying clients with producing events to Kafka. For example, if a stock is changed for a product, we update stock data and produce stock-changed event to inform clients. That’s why atomicity is vital for us, it has to be both success or fail. Partial success is not acceptable in our scenario. This is where connector becomes handy for us. You just save it to Couchbase and Couchbase handles the publishing process.It has some important benefits. It guarantees producing events for documents. You are not adding new dependency to your application, you don’t have to implement retry mechanism and response time in your app is relatively less since you are not publishing events to kafka directly.

In typical scenarios, a Couchbase document mutation triggers the production of a corresponding event, encapsulating the entire document’s changes. At GP, where products, prices, and stock data reside in a unified document, a single mutation may necessitate the generation of multiple events. For instance, adding media to a product triggers events such as “product-updated” and “media-added.”

Extend Couchbase Kafka Connector

In the Couchbase Kafka Connector, you can produce only one event from single mutation. Since existing couchbase connector didn’t offer this flexibility as a built in feature, we had to extend it and produce to one common event to a topic and distribute events to end topics as illustrated below

Extended couchbase kafka connector with multiple events for one mutation
Extended couchbase kafka connector with multiple events for one mutation
 // product document template with events
{
"id": "123",
"version": 0,
"product": {},
"prices" :[],
"stocks" : [],
"events": [
{
"topic": "media_added",
"payload": "{\"productId\":\"123\",\"url\":\"randomImage.jpg\"}",
"key": "123"
},
{
"topic": "product_updated",
"payload": "{\"productId\":\"123\",\"url\":\"randomImage.jpg\"}",
"key": "123"
}
]
}

We have deployed this to production and started observing it. We realized that from time to time configs are being lost in connectors and until we detect and rerun it , we were not able to produce events. We applied some solutions to be alerted immediately like side car checker but nonetheless until you rerun the connector if versions are changed in your documents , you may miss producing those events. Other teams in Trendyol also faced with this problem. This leads us to find another way. Due to this and couple of other reasons(scalability etc.),the go-dcp project has been started and as GP and we were early adopters of it.

Go-DCP is the go implementation of the Kafka Connect Couchbase developed by Trendyol. Go Dcp streams documents from Couchbase Database Change Protocol (DCP) and publishes Kafka events in near real-time.

Use go-dcp as connector

We have switched to use the go-dcp as connector and our config loss problem disappeared. Also go-dcp has a feature to produce many events from a single mutation to many topics. With this feature, we have got rid of kafka2kafka application.

// A basic implementation example of go-dcp-connector
func main() {
connector, err := gokafkaconnectcouchbase.
NewConnectorBuilder(configs).
SetMapper(mapper).
Build()
if err != nil {
panic(err)
}
defer connector.Close()
connector.Start()
}

func mapper(event couchbase.Event) []message.KafkaMessage {
if event.IsExpired || event.IsDeleted {
return nil
}

// binding to events model
var documentWithEvents Document
_ = json.Unmarshal(event.Value, &documentWithEvents)

//convert to kafka messages
kafkaMessages := make([]message.KafkaMessage, 0)
for _, e := range documentWithEvents.Events {
kafkaMessages = append(kafkaMessages, message.KafkaMessage{
EventId: e.EventId,
Topic: e.Topic,
Key: []byte(e.Key),
Value: []byte(e.Payload),
})
}

return kafkaMessages
}

type Event struct {
EventId string `json:"eventId"` //UUID
Topic string `json:"topic"`
Key string `json:"key"`
Payload string `json:"payload"`
}

type Document struct {
Events []Event `json:"events"`
}

We have deployed this to production and started monitoring it. Actually everything was going well for awhile until our throughput had been increased instantly 10x more in a night due to a load test within Trendyol. Later that morning, we discovered that even we update document in couchbase, we couldn’t produce some events.

The source connector does not guarantee every version of a document will be reflected in the Kafka topic. For example, if a Couchbase document value changes rapidly from V1V2V3, or if the connector is not running when the change occurs, the connector is only guaranteed to publish the latest value (V3). In other words, changes to the same document may be "deduplicated", in which case only the latest version is published.

It looks like due to the rapid change of a document, we may miss events since each version for a document is crucial for us.

Use eventing function to move events to a new collection

For a quick solution, we thought we could develop an eventing function that would copy all events in the document to a new event-document under a new collection(events). With this workaround, we will start consuming dcp events from new collection and in new collection there will be always one version(v1). Also we have a default TTL(time to live) value in events collection and those will be expired after a certain period since we don’t have infinite resource.

Separating events collection from source collection via eventing function
// Copies all events in document and create event-documents to tgt(events) collection
function OnUpdate(doc, meta) {
if (doc.events != []) {
for (var event of doc.events) {
tgt[event.eventId] = event;
log("Doc created", event.eventId);
}
}
}
// an example event document
{
"topic": "media_added",
"payload": "{\"productId\":\"123\",\"url\":\"randomImage.jpg\"}",
"key": "123"
}

Since the eventing function is also consuming dcp events from same source, it didn’t work well. We don’t have any data or any way to prove it but we think in this way we lost less events than previous approach. As I mentioned above it was a workaround solution for saving the day and we still use this approach in other services where events are not customized. On the other hand, we can easily spot the problem with this way if problem occurs on couchbase or connector. If event document is present on Couchbase but event is not published to kafka, then we can be sure about the problem is on the connector side.

Use transactional outbox pattern

We tried to avoid using transactions in our applications due to several reasons listed below.
— Performance Overhead
— Limited Scalability
— Increased Complexity
— Impact on Availability

Due to other approaches didn’t work and atomicity is vital for our use case and we applied transactional outbox pattern with using couchbase transactions.
In short, we save event-documents and product document in a single transaction. If product is updated , it is guaranteed that event is also produced to kafka.

In the Couchbase Transactional Outbox Pattern, event data is temporarily stored in a separate Couchbase bucket within the same transactional boundary as the business operation that generates the event. This ensures transactional integrity. A periodic process moves the event data to the main outbox bucket for asynchronous publication, enabling reliable event-driven communication between microservices while leveraging Couchbase’s features and scalability.

Conclusion and Future Outlook

In conclusion, our journey of handling Couchbase DCP events has been marked by continuous adaptation and innovation. Each solution presented its own set of challenges, but through perseverance and collaboration, we’ve been able to overcome them. However, as our requirements evolve and our throughput increases, we continue to explore alternative solutions. Our next step involves leveraging Couchbase’s change data capture (CDC) capabilities in version 7.2.2+(released recently), allowing us to record change history for Magma storage type. We hope to share our findings and outcomes in a future article.

Special thanks to my teammates Yasin, Vlad, Mehmet for their valuable contributions in this journey

About Us

Would you like to be a part of our growing company? Join us! Check out our open positions and other media pages from the links below.

--

--