Implementing CQRS With CBES
Hello everyone, Emrah Kurman and I want to share our story of how we implement CQRS with CBES(Couchbase Elasticsearch Connector). Also, we will talk about what CBES is, why we use it, how we implement it and customize it. If you’re ready, let’s start.
Before explain what CBES is and how we use it let’s talk about our problem. As we all know, the best solutions are always found when solving difficult problems. We also found the solution of CBES while dealing with such a difficult problem.
As Trendyol OMS Team, we manage our customers’ and sellers’ orders, packages, and claims. A while ago we have decided to re-platform one of our microservices to reduce response time.
As we all know, it is necessary to change an architectural structure rather than a tool or technology to scale. At this point, we have decided to separate our read and write API with CQRS. For those who don’t know, CQRS is an architectural design pattern that isolates read and writes operations from each other, giving us the opportunity to scale on the write and read side, and data source independence.
We have decided to use NoSQL DB on the Read side and our choice here was Couchbase. However, in complex queries or deep searches within a document, the index nodes were overloaded, and unfortunately, we could not get the results we wanted. At this point, we have decided to use the search and query power of Elasticsearch. We sent the queries to Elasticsearch and pulled the Couchbase data using the document IDs returned from the query. But there was a big problem here. How would the synchronization between Cocuhbase and Elasticsearch be? We could provide synchronization by connecting a connector from Couchabse to Kafka and a connector from Kafka to Elasticsearch (source and sink connector). However, since the transfer of data went through two different stages, latency would occur, and we would have to intervene.
This is where CBES comes into play. CBES was already a technology used by many teams in Trendyol, but as the OMS team, we were going to use it for the first time in our tech stack.
After implementing CBES our architecture turned into a structure as seen in the image below.
As seen in the figure, our listener consumes domain events from The Write API and populates our Couchbase. Then CBES comes into play and indexes Elasticsearch the data written to Couchbase. After that when Read Api processes a request, it first searches Elasticsearch, then pulls the data from Couchbase in the form of key values with the ids it finds in Elasticsearch.
What Is CBES & How Does It Work?
Let’s take a deep dive into the CBES, CBES stands for Couchbase Elasticsearch Connector. According to its official documentation, The Couchbase Elasticsearch Connector(CBES) replicates your documents from Couchbase Server to Elasticsearch in near real-time. The connector uses the high-performance Database Change Protocol (DCP) to receive notifications when documents change in Couchbase.
Let’s take a deep dive into the DCP protocol that Cbes uses in the background. As most of us know, databases are not just repositories that store data. Especially in a distributed cluster, there is a continuous data stream for data synchronization between nodes so a cluster can be viewed as an ongoing stream of changes. When Couchbase requests a change to a document, it doesn’t go and modify it directly on disk, instead, it only appends to a log and keeps moving so writing operations happen quickly. DCP’s approach is somewhat similar to the write-ahead logging you see in other software. Couchbase already has many available connectors. You can easily use them in your production environment. With this connector, you can sync an Elasticsearch cluster and index your data in Couchbase to this Elasticsearch cluster. The connector is fed by DCP that keeps Elasticsearch up to date.
How Did We Use & Customize CBES?
While Cocuhbase was the database to keep our documents, we wanted to benefit from the search capabilities of Elasticsearch, we learned of the existence of CBES, and the first hurdle was overcome. The second hurdle was whether it was really necessary to keep the same data in two different places. If we indexed all the fields in our document in Elasticsearch, we would have to keep the same data in two places, and our number of operations would have doubled anyway. If there were 100 fields on the document, we were performing all our queries using 15 of them. Indexing all fields meant detecting and applying the mutation of 100 fields to check just 15 fields. For this reason, we have decided to index only the fields that we will use in the query by Elasticsearch. The CBES provided this, but some of the fields we would use were inside an object, while others were on the top of the node. We previously used Elasticsearch as our main data source. However, the size of the document and the presence of nested objects caused our query performance to be bad because Elasticsearch evaluates the aggregate and nested documents separately when searching for nested documents. Based on our previous experience, we have noticed that when we normalize data, we get performance gains, and data size is reduced. The CBES could not denormalize the nested object field. That’s why we have customized CBES according to our own needs. In addition, we shortened the indexing time of CBES as we made the data smaller.
The CBES served us filterFieldesDocumentTransformer to index specified fields on a document. But as we mentioned above, we also need fields like wandMaker’s id. So we added a new field called denormalized fields to example.toml file.
To take and map denormalized fields we changed CBES source code a little bit, thanks to our teammate Coşkun Bıdıcı
Three different chains extend TransformerChain class.
- EmptyTransformer: which is used for indexing whole documents.
- FilterTransformer: used for indexing filter fields.
- DenormalizedTransformer: used for indexing inner fields.
We added transformerChain as a new field to DefaultDocumentTransfer and initialized it in the constructor, then called the transformerChain to do its mapping before indexing the document. After customization, we dockerize the CBES project and push the image.
To deploy a CBES project to Kubernetes, you need YAML files containing service, stateful set, config, and secret types. We have created two different YAML files for these.
- elasticsearch-connector.yml contains service and stateful set types. Customized CBES image used it as statefulSet’s image.
- On the other hand, elasticsearch-connector-configuration.yml file contains secrets and configs.
Since we use GitLab as our repository, we created a gitlab-ci.yml file and at a deployment stage, applied kubectl apply -record.
Our First Migration Adventure
We weren’t sure the migration would be started from the beginning when we deployed the CBES. Right after the deployment, CBES started to migrate all data from the beginning, and it was marvellous to watch how fast migration was. Not so later, we realized that the CPU and Memory limit was not enough for migration, for this reason, we increased the limits until migration speed slowed. The CBES migrated more than 300 million documents in a day.
After our first CBES trial, we noticed the difference between the response times of our service before and after CBES. There was a critical decrease. So, we decided to use CBES for our other services. Unfortunately, CBES consumes a lot of resources. To overcome this issue, we distributed CBES services to three different clusters instead of deploying them to one cluster. If you are going to set up CBES for different buckets as we do, we recommend that you make sure you have enough resources.
In conclusion, CBES
- is customizable since it is open source.
- migrated more than 300 million documents in a day.
- pushed the limits of both CPU and Memory, don’t forget that.
- uses a lot of resources, distribute different CBES services to different clusters if you can.
PS: Don’t hesitate to contact our beloved HR team for tempting career opportunities.