Implementing Transactional Outbox With Couchbase

Burak Gündoğdu
Trendyol Tech
Published in
8 min readJun 5, 2024

In this article, I will explain how we implemented the transactional outbox pattern with Couchbase in a Spring Boot application.

First, I will discuss the need for a transactional outbox and how we addressed the problem we were experiencing. Then, I will cover how to use transactions with Couchbase in a Spring Boot application.

Content
· The Problem
· Considering Options
Storing Events on Aggregate
Writing Events Transactionally To Outbox
· Implementing Transactions In Couchbase
· Using Kafka Connector
· Conclusion

The Problem

As the Localcommerce PIM team at Trendyol, our main responsibility is to manage grocery catalog products and send all changes related to these products to internal systems.

We use an event-driven architecture to communicate with these systems. We must manage the entire catalog product manipulation process and deliver change events to internal systems to ensure consistency. After changes to the catalog products are saved in the database, events are sent to Kafka in the application, as shown below:

productRepository.save(product)

publisherService.publish(events)

Kafka is distributed streaming platform designed to handle large volumes of real-time data. Kafka’s reliability heavily relies on stable network connectivity between clients and brokers. However, in real-world scenarios, certain challenges, such as network-related issues, can lead to event losses.

In our use case, after the database save process is completed, if a Kafka network problem occurs or the application goes down for any reason, this will cause event loss. Consequently, the products used by internal systems would become inconsistent. For example, updates made by internal users to the catalog product may not be reflected in the product seen by the end user on the Trendyol Grocery Platform. In such cases, we would need to start the re-index process for all grocery catalog products.

Considering Options

We worked on several possible solutions and discussed their pros and cons. To maintain the focus of this content, I will not provide detailed information about all the solutions we considered. Instead, I will discuss systems using NoSQL databases, the commonly used outbox pattern, and the transactional outbox pattern.

Storing Events on Aggregate

Another approach to implementing the outbox pattern is storing events in an aggregate model and then sending events from it. To illustrate this better, let’s explain it through a simple drawing:

Send Events From Aggregate

As you can see in the simple diagram above, domain events are sent to the domain event topic via the Kafka source connector from the events array on the aggregate.

One of the advantages of the system described above is that it eliminates the need for transactions. This means that you won’t experience any latency in your command operations, as there is no need to manage transaction boundaries. By storing events directly in the aggregate model and using a Kafka source connector to send these events to the domain event topic, the system can process commands more efficiently. This approach reduces the overhead associated with transactional operations, leading to faster performance and lower latency.

However, storing domain events in an array on the aggregate can increase the document size and create challenges in managing this array. You may also want to keep the events array at a specific size. In this case, any issues that occur on the connector side or on the Kafka side could cause you to lose events that exceed this size.

Another disadvantage of above system is that Kafka connector ensures that the latest change to a document is always delivered, it doesn’t promise the transmission of every version of that document. This also means that there is no guarantee of delivery for every domain event you add to the events array. As a result, it can lead to event loss.

To address these challenges, we can write events transactionally to an outbox.

Writing Events Transactionally To Outbox

Now let’s talk about the solution we chose and the pros and cons of this solution. Below, we’ll illustrate the architecture of the chosen solution:

Send Events From Outbox Collection

As seen in the diagram above, we decided to write the domain events on the aggregate to an outbox collection, and we did this within a transaction scope. I will discuss Couchbase’s transaction feature and its use in a Spring Boot application in the following sections.

When a command is received by the application, we initiate a transaction. Within this transactional scope, as product updates are persisted to the product collection, domain events are concurrently saved to the outbox collection with a specified Time-To-Live (TTL) value. This ensures that when Couchbase’s transactional support, which adheres to ACID properties, is leveraged, the Atomicity property guarantees that the entire process is completed under the transaction scope. Consequently, domain events are reliably written to the outbox collection alongside product updates, ensuring consistency and integrity within our system.

After completing the transaction process, the async process starts, as shown in the diagram above. The Kafka Source Connector streams outbox documents from Couchbase Server using the high-performance Database Change Protocol (DCP) and publishes to domain event topics. And finally, integration consumers consume domain events and convert them to integration events, which are then sent to integration topics to be consumed by external contexts. In this way, we guarantee the delivery of changes made to the product to external contexts.

Persisting events in the outbox collection with a TTL provides the architecture with the capability to reprocess events should any issues arise on the Kafka or connector side. Moreover, storing events in a separate collection (outbox) helps reduce the size of the aggregate model, resulting in a cleaner and more understandable document structure. However, alongside these advantages, the use of transactions introduces latency to all write operations.

In our use case, preventing event loss is more important than the slower processing of commands, which is why we chose this solution.

Implementing Transactions In Couchbase

We used @Transactional annotation in Spring Boot 3.1 application. We prefer Kotlin programming language in our core services and we prefer KediatR , which is an open source Mediator implementation.

We wanted all commands to be completed under transaction scope, so we extended KediatR CommandWithResultHandler according to our own use case as follows :

abstract class CommandWithTransactionalHandler<TCommand : CommandWithResult<TResult>, TResult : Aggregate> :
CommandWithResultHandler<TCommand, TResult> {

@Autowired
private lateinit var commandBus: CommandBus
abstract fun process(command: TCommand): TResult

@Transactional
override fun handle(command: TCommand): TResult {
val aggregate = process(command)
aggregate.events.forEach { commandBus.publishNotification(it) }
return aggregate
}
}

All commands implement the above process method and run their business logic within this method, returning the aggregate. The overridden handle method is transactional and in-memory publishes the domain events on the returned aggregate.

@Component
class DomainEventHandler(private val repository: OutBoxRepositoryAdapter) :
NotificationHandler<DomainEvent> {
override fun handle(notification: DomainEvent) {
repository.save(notification)
}
}

We capture each published domain event in the domain events handler and save these domain events to the outbox collection within the transaction. And as a result the transaction is completed.

To integrate Couchbase Transactions with Spring’s @Transactional annotation, you need to check the versions of the dependencies being used and write a custom Couchbase manager. We used Spring Data Couchbase .

My colleague explained in detail the configurations we made and the use of dependencies we used in this article.

Important Note: Spring Data Couchbase Transactions will not work with multi-buckets as multi-buckets in Spring Data Couchbase requires multiple connections to Couchbase and the transaction must be on a single connection. That’s why we managed it using multiple collections in a single bucket. (Bucket : Product -Collections : product , outbox)

Let me share a command logic below as an example of our use case.

CommandHandler

@Component
class UpdateDesiByContentIdCommandHandler(
private val productRepository: ProductRepository
) : CommandWithTransactionalHandler<UpdateDesiByContentIdCommand, Product>() {

override fun process(command: UpdateDesiByContentIdCommand) = with(command) {
val product = productRepository.loadOrThrow(contentId)
product.changeDesi(desi)
productRepository.save(product)
product
}
}

Aggregate Model

abstract class Aggregate {
abstract val id: Long
val events: MutableList<DomainEvent> = mutableListOf()

fun addEvent(event: DomainEvent) {
this.events.add(event)
}
}

Domain Model

data class Product(
override val id: Long,
var desi: Double,
var name: String,
var brandId: Long,
var categoryId: Long,
// other properties ...
) : Aggregate() {
internal fun changeDesi(desi: Double) {
if (desi <= 0) {
throw BusinessException("product.desi.must.be.positive")
}
this.desi = desi
raiseUpdatedEvent()
}

private fun Product.raiseUpdatedEvent() {
this.addEvent(ProductUpdatedDomainEvent(this))
}
}

In this command logic, the update is done via the domain model method and the update event is added to the aggregate. Aggregate is sent to the repository save method, but while saving, it is converted to a document and events are excluded here. Thus, there will be no events left on the product document. After the command logic is completed, the async process starts, which I will talk about in the next section.

Using Kafka Connector

The Couchbase Kafka connector is a plug-in for the Kafka Connect framework. It provides source and sink components.

The source connector streams documents from Couchbase Server using the high-performance Database Change Protocol (DCP) and publishes the latest version of each document to a Kafka topic in near real-time.

Kafka connectors can be run in standalone or distributed mode. Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.

We used this source code by configuring it in distributed mode for outbox collection. However, after observing Kafka Connector Production for a while, we realized that the resource consumption was high.

Later, we started to use the go-dcp kafka connector , which was developed by Trendyol Tech and became open source, due to less resource usage and higher efficiency. Additionally, we can easily scale up and down with the help of go-dcp-kafka-connector’s custom membership algorithm and configurations can be easily managed.

Go dcp kafka connector exposes several metrics , we can monitor connector lag count metrics exposed by go-dcp-kafka like below :

Kafka Connector Lag Count

Important Note: When transaction successfully completed for inserting new document, the DCP client receives a Deletion event for the same document ID first. Then, a Mutation event (for the actual insertion).

Conclusion

In this article, we discussed commonly used outbox implementation solutions and highlighted the advantages brought to us by the transactional outbox solution we chose. We also talked about implementing Couchbase transactions in a Spring Boot application.

After moving the Product Service to the transactional outbox structure, we never experienced event loss. For some reasons, external consumers may not be able to process integration events originating from our domain. In such cases, it is very easy to re-trigger the events since we keep them persistently in the outbox.

In the article, after experiencing the advantages of transactional outbox implementation in the production, we moved all our core services to the transactional outbox structure.

I want to extend a special thanks to my colleagues emre tiryaki, Tolga Takır, and Ramazan Demir for their help in creating this article. Their input and assistance have been incredibly valuable, and I appreciate their support throughout the writing process.

Thank you for taking the time to read through this article

About Us

Come be a part of our growing team! Discover our available job opportunities and visit our career website using the links below.

--

--