Synchronizing Data Sources: A Journey of Decomposition with Couchbase Transactions

Cahittulum
Trendyol Tech
Published in
11 min readOct 4, 2023

In this article, we will describe our experience with transitioning data sources during the decomposition process, the methodologies we employed, and the reasons behind our choices. To begin, let’s clarify what the term “basket” signifies in the context of Dolap.

The basket is one of the most important features of the Dolap app. At its most basic level, a basket is a software feature that allows customers to browse sellers’ product offerings and add items to a basket. As the customer continues shopping, they can add or remove items from the basket and even save items for later purchases. Displaying the total cost of the order, including campaigns, shipping costs, and any fees the basket provides customers with a clear understanding of what they will be paying before they finalize their purchase.

In the Dolap, the basket was a part of the giant monolithic application. While a monolith can have its advantages on occasion, it can become increasingly challenging as the team and project expand. This is why the decision was made to break it down.

The primary objectives of the decomposition process encompass principles such as reliability, availability, resiliency, scalability, and ease of development, all while striving for a more efficient project that minimizes resource usage and reduces costs.

Therefore, when decomposing a project, we needed to review the algorithms, technologies, and hardware we worked on. In terms of cost, we decided to move away from AWS RDS and Redis as a cache, which was causing us the most significant financial impact, and transition to Couchbase.

So, we are moving out our entire business logic from a monolith to a microservice with refactoring of course. While making this transition, we also decided to change the database for optimization and cost-saving purposes. Switching from a relational database to a NoSQL database can be a complex problem that may cause data loss.

Making database changes also brings some consistency problems. You have to migrate all user data to the new database without any downtime and you have to be sure that all data is safely migrated to other database. Besides these concerns, we had another problem to solve. Since we are decomposing all the business logic into a microservice, we need to make sure that all business logic is working properly.

Syncing Datasources via Transactions

Our main goal was to quickly complete the decomposition process and eliminate the resources we used previously, so we looked for the fastest way to achieve this. Unless there was a major issue, we had no intention of reverting to the old system.

After migrating all our data from PostgreSQL to Couchbase using a job, that transforms user baskets to their NoSQL model, eliminating unused fields and making it a more compact form. We decided that bringing the system live and synchronizing user-generated transactions with domain events in the code was the fastest and most effective approach for us. Therefore, during the post-transition verification, we established a structure to keep the two databases in sync using Spring Domain Events, Couchbase Transactions, and JPA Transactions.

Here is our high-level architecture for syncing two data sources:

Working with Multiple Transaction Managers

Since we have two transaction managers, we need to specify a transaction manager for using couchbase transactions like @Transactional(transactionManager=“couchbaseTransactionManager”)

@Override
@Transactional(transactionManager = "couchbaseTransactionManager")
public AddProductToBasketResponse handle(AddProductToBasketUseCase useCase) {
final BuyerId buyerId = useCase.buyerId();
final ProductId productId = useCase.productId();

...

final Basket basket = basketPort.findByBuyerId(buyerId);

basket.addItemToBasket(basketItem);

if (useCase.eventPublisherEnabled()) {
eventPublisher.publish(basket.domainEvents());
}

...
}

We are using the DDD approach, so, our handlers are easy to read. Handlers orchestrate the business operations but do not make any business decisions. They don’t contain any business logic. The code speaks for itself, but I will explain it anyway. We are registering the synchronization event on “basket.addItemToBasket(basketItem);” and saving it to the database on “basketPort.save(basket);”.

public void addItemToBasket(BasketItem basketItem) {
...

items.add(basketItem);

...

registerEvent(new NewItemAddedToNewBasketDomainEvent(
basketItem.getProduct().getId().getValue(),
getBuyerId().getValue(),
basketItem.getProduct().getSeller().id().getValue()));
}

The command “eventPublisher.publish(basket.domainEvents());” is the code snippet where domain events are published. Synchronization of the two databases could have been done within the business flow, but we chose not to do it within the flow for the sake of readability and to avoid mixing contexts. Generally, since we use “UseCases” and “UseCaseHandlers” for business flows, we prefer not to write a separate use case for synchronization. Therefore, we opted to use domain events for the synchronization of the two databases.

The transaction won’t be finished after the handle function finishes its work and here is a reason for that. Our base domain event listener looks like this:

public interface RetryableSyncDomainEventListener<T extends DomainEvent> {

@EventListener
@Transactional
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 5), retryFor = {Exception.class})
void consume(T event);

@Recover
void onFailure(Exception exception, T event) throws Exception;
}

As you know @Transactional has a default propagation called Propagation.REQUIRED. It supports current transaction and creates a new one if none exists.

Event listeners consume the domain events and retries 3 times with a delay of 5 ms. If nothing can be done in 3 retries, it will throw the exception and finish the transaction. The transaction waits until everything is done. Then the service responds to the user.

Now, let’s have a look at how the couchbase transaction mechanism works.

Couchbase Transaction Mechanism

Image Source: Link

Since Couchbase is a distributed system, the ACID database transactions are an important concern that should be handled in a distributed way. Couchbase supports distributed multi-document ACID database transactions which means transactions are distributed and working across multiple documents on multiple nodes.

A metadata document(ATR) storing the list of transaction attempts is created for each running transaction. Anytime a transaction interacts with a document, an entry is added to those documents. Those records are stored in the default collection. However, if the custom collection is specified, they can be stored in a different collection.

@Override
public void configureEnvironment(ClusterEnvironment.Builder builder) {
builder.transactionsConfig(TransactionsConfig.metadataCollection(getMetadataCollectionKeyspace()));
}

private TransactionKeyspace getMetadataCollectionKeyspace() {
return TransactionKeyspace.create("basket", "_default", "transactions");
}

Those documents are simply a source of truth for transactions to distinguish committed attempts. They are especially important for transactions that need multiple documents in their execution block. The atr document of a transaction includes the attempt status, an attempt id, its expiration time, and more. Here is an example attempt document of a transaction modifying more than one document.

{
"meta": {
"id": "_txn:atr-814-#618",
"rev": "19-1766147c41d900000000000000000000",
"att_reason": "invalid_json",
"expiration": 0,
"flags": 0,
"type": "base64"
},
"xattrs": {
"attempts": {
"9e3312ab-b470-464f-bd89-a6114d44856a": {
"tid": "fd10f963-4f34-4368-8e97-a4f8e85dad69",
"st": "PENDING",
"tst": "0x0000d9417c146617",
"exp": 10989,
"d": "n"
},
"9e3312ab-b470-464f-bd89-a6114d44856a": {
"tid": "a23k1542-9a12-8321-991e-wb3as88qoa91",
"st": "PENDING",
"tst": "0x0000d6615a86674",
"exp": 10989,
"d": "n"
}
}
}
}

In order to avoid reading or updating uncommitted documents, the last state of updated documents in the transaction block is stored in the document itself as xattrb. It is a lock mechanism to prevent other transactions from reading or updating the documents. The document body is replaced with its last state when the transaction is about to commit the attempts.

After committing the attempt successfully, the related attempt is removed from the atr, and the document is updated with the state in the metadata. The following document illustrates document metadata with all information about the running transaction on the document, the operation type, the last document body after executing the operation, and the list of attempts in the document metadata.

{
"meta": {
"id": "630803700",
"rev": "19-1766147c45a000000000000002000000",
"expiration": 0,
"flags": 33554432,
"type": "json"
},
"xattrs": {
"txn": {
"id": {
"txn": "fd10f963-4f34-4368-8e97-a4f8e85dad69",
"atmpt": "9e3312ab-b470-464f-bd89-a6114d44856a",
"op": "e2d0da8a-834d-479e-b63e-5f95a4e3d212"
},
"atr": {
"id": "_txn:atr-814-#618",
"bkt": "Basket",
"scp": "_default",
"coll": "transactions"
},
"op": {
"type": "replace",
"stgd": {
"items": [
{
"addedAt": 1686057632240,
"id": 12998262,
"selected": true
}
],
"selectedCouponId": 0
},
"crc32": "0xf5cc7406"
},
"restore": {
"CAS": "0x17661478469b0000",
"exptime": 0,
"revid": "18"
}
}
}
}

Transaction Cleanup Process

Of course, transactions cannot always commit all attempts successfully. Cluster failovers or forceful shutdowns during the transaction execution bring lost or uncompleted attempts. Also, the transaction modifying the document can be killed somehow before committing or rolling back changes. The documents in such situations remain staged and cannot be read or updated from other transactions until their last state in the document metadata is there.

Any application connecting to the Couchbase cluster runs a background cleanup thread for restoring such lost attempts. The cleanup threads are basically responsible for finding lost transactions and the documents which are modified but have no active transactions running anymore. They find the documents and clean their metadata in a consistent manner.

The applications run a cleanup process, and their host information is also stored in a Couchbase document called _txn:client-record. Here is one such document:

{
"meta": {
"id": "_txn:client-record",
"rev": "77642-1768d542af3100000000000000000000",
"att_reason": "invalid_json",
"expiration": 0,
"flags": 0,
"type": "base64"
},
"xattrs": {
"records": {
"clients": {
"427d2d85-85a0-47a1-9de9-19eb8fc3995a": {
"expires_ms": 80000,
"num_atrs": 1024,
"implementation": "java",
"version": null,
"host": "127.0.0.2",
"process_id": 1,
"heartbeat_ms": "0x00008d653ed56817"
},
"0681a97f-f99f-413c-a255-eb2baef28ebe": {
"expires_ms": 80000,
"num_atrs": 1024,
"implementation": "java",
"version": null,
"host": "127.0.0.1",
"process_id": 1,
"heartbeat_ms": "0x000031af42d56817"
},
}
}
}
}

If you would like to list all documents modified in a transaction block, you can use the following query with your transaction id. Keep in mind that you should have a query node in your cluster to run your n1ql query.

SELECT * FROM Basket._default.Basket
WHERE meta().xattrs.tid='7914f06d-cb2a-466b-829a-f88da988ab03';

Little note for the cleanup process:

There are different types of durability levels Couchbase provides for transactions. The durability level of Couchbase transactions refers to the guarantee that once a transaction is committed, the changes made within that transaction will be durably stored and available for future reads, even in the event of failures such as power outages or crashes.

The default setting is DurabilityLevel.MAJORITY, meaning a transaction will pause on each write until it is available in-memory on a majority of configured replicas. If you specify your DurabilityLevel as less than MAJORITY which is NONE, your lost transactions will not be cleaned.

You can specify the durability level via transaction configuration.

@Override
public void configureEnvironment(ClusterEnvironment.Builder builder) {
if (QA_ENVIRONMENT.equalsIgnoreCase(getEnvironment())) {
builder.transactionsConfig(TransactionsConfig.durabilityLevel(DurabilityLevel.NONE));
} else {
builder.transactionsConfig(TransactionsConfig.durabilityLevel(DurabilityLevel.MAJORITY_AND_PERSIST_TO_ACTIVE));
}
}

Transaction Cleanup Monitoring

We mentioned above that some DurabilityLevel configurations do not support transaction cleanup. What if the transactions cannot be cleared? How is it tracked and how can it be handled?

There are different types of events when couchbase tries to clean lost transactions. Such as:

  • TransactionCleanupEndRunEvent is raised whenever a current ‘run’ is finished and contains statistics from the run. (A run is typically around every 60 seconds, with the default configuration.)
  • A TransactionCleanupAttemptEvent event is raised when an expired transaction is found by this process, and a cleanup attempt is made. It contains whether that attempt was successful, along with any logs relevant to the attempt.

But if cleanup fails to clean a lost transaction that is more than two hours past expiry, it will raise the TransactionCleanupAttemptEvent event at WARN level rather than the default DEBUG. With most default configurations of the event-bus, this will cause that event to be logged somewhere visible to the application. If there is not a good reason for the cleanup to be failed (such as a downed node that has not yet been failed-over), then the user is encouraged to report the issue.

You can set an alert for the TransactionCleanupAttemptEvent’s **default log or you can write an interceptor and subscribe it to that event.

@Override
public CouchbaseClientFactory couchbaseClientFactory(Cluster couchbaseCluster) {
couchbaseCluster.environment().eventBus().subscribe(event -> {
if (event instanceof TransactionCleanupAttemptEvent && event.severity() == Event.Severity.WARN) {
logger.error("There are transactions can not be cleaned!");
}
});
return super.couchbaseClientFactory(couchbaseCluster);
}

The Problem

Just like all designed systems and decisions, this system designed using domain events and a multiple data source manager has its pros and cons. When designing the system, the decisions made are based on various parameters, and one of the most important factors is cost and time.

One of the main reasons for designing this system to maintain two data sources in sync was the expectation that this system would only be operational for a short period of time, and in the absence of any issues with the system, the sync process would eventually be phased out, and the old resources would be destroyed. In systems where the sync process would continue for an extended period, this approach is quite risky and doesn’t fully guarantee eventual consistency.

As shown in the image, we have two different transaction contexts, and the Couchbase transaction context encompasses the JPA. If an exception occurs within the inner transaction context, Couchbase can perform a rollback. However, after the JPA transaction manager has completed its work, if an exception is thrown in the upper context, which is the Couchbase transaction manager context, it cannot roll back the committed JPA operation.

What Can Be Done?

If you want to maintain the synchronization of multiple data sources for an extended period in a cost-effective manner, you can consider the following options:

  1. Utilize Pub-Sub Message Brokers with Design Patterns: You can use Pub-sub message brokers in conjunction with design patterns. For example, you can use Kafka in conjunction with the outbox pattern. And write consumers to sync data sources.
  2. Integrate a Change Data Capture (CDC) Platform: You can achieve synchronization between two data sources by integrating a CDC platform.

Conclusion

The article includes challenges and the way of using transactions in our basket service. Using Couchbase transactions is crucial for ensuring data consistency. The complexity of our migration, coupled with the need to synchronize data between the two databases, required a robust transactional mechanism.

By combining Couchbase transactions with JPA transactions, we were able to maintain the integrity of our business logic. The adoption of Couchbase transactions helped us keep the system consistent.

By leveraging Couchbase distributed transaction capabilities, we have successfully addressed the challenges associated with migrating databases and decomposing business logic, providing a solid foundation for future scalability and maintainability. Please clap and share if you found the article interesting 🙏

Co-Author: Mert Oz

About Us

We’re building a team of the brightest minds in our industry. Interested in joining us? Visit the pages below to learn more about our open positions.

References

--

--