Eliminating the Latency: Displaying Campaigns to Thousands of Sellers in Less Than 5 Minutes
For Trendyol, an e-commerce platform that offers a marketplace model, Campaigns is one of the most important discount tools that help sellers to reach more customers and increase their sales by adding their products to the campaigns.
As Seller Campaigns team, our responsibility is to maintain the campaign management process which contains the creation of these campaigns, showing them to relevant sellers and providing necessary tools for them to add their products to the campaigns, and thus improve seller experience and engagement.
In this article, with my colleague Hasan Utku Uçar, we will be discussing latency problems that we faced while displaying the campaigns to the sellers especially in big events like Black Friday and how we have decreased the latency of showing these campaigns.
Problem
A campaign can be shown to a seller if the seller has the relevant products that meet the campaign’s participation criterias like category, brand, price range, etc. In order to increase seller participation in campaigns, it is necessary to sort these campaigns based on the number of products or total stock quantity that can be added. For all of these, we have to calculate these values by filtering and using the seller’s product data.
Initially, we were sending Kafka events for each seller and a campaign pair to calculate eligible product counts and their total quantities for deciding if the seller has eligible products or not. While processing these events, we were sending filter requests to the Product Team’s API and we were caching the eligible product counts to Couchbase.
However, for tens of thousands of sellers and hundreds of campaigns, it was taking a significant amount of time to process these events because we were sending millions of product filter requests. Because of that, we were experiencing delays in displaying new campaigns to the sellers. Delays could even reach hours during critical event periods such as Black Friday and 11.11. The reason for this was the time-consuming process of handling 15–20 million Kafka events.
The main problem here was that our campaign data and the product team’s product data were separate, making it difficult to perform sorting and filtering operations based on data belonging to different teams. To solve this problem, we decided to keep a small part of the product data on our side because we had already tried all possible improvements that could be made while using the other team’s API, but they were ineffective.
Transferring Product Data To Our Elasticsearch
Since we have more than one and special kinds of filtering needs on the product data that the Product Team kept on Couchbase, we decided to transfer it to our Elasticsearch. And we used Couchbase to Elasticsearch (CBES) Connector for this job. With this connector, all product documents and every change on these documents after the data are transferred have become written to Elasticsearch. Also, we kept only the fields that we needed to filter on the documents, and thus the index size was reduced.
Elasticsearch Tuning
After transferring the product data to our side, we started to send the product filter requests to our Elasticsearch, but as it was expected the same performance problems continued. Then we focused on tuning the Elasticsearch.
Index Settings
Initially, we checked the index settings, which are crucial for Elasticsearch query performance. Our index had approximately 800 million documents and a size of 1TB. Since our data was divided into shards on Elasticsearch, the size of the shards was a critical setting for both read and write performance. According to Elasticsearch documentation, there are general practices for shard sizes, but there is no one-size-fits-all solution. It is recommended that shard sizes range from 10GB to 50GB for log clusters and average around 20–25GB for search clusters. To determine the shard size, the suggested strategy is to set an initial value and test query performance to find the optimal value.
According to these practices, the shard sizes we were using were not compatible with the recommended values. We rearranged the index settings for shard sizes to be around 20GB on average and performed a reindex. After that, we tested the new index and observed that the queries were performing more efficiently compared to the previous index.
Increasing Cluster Resources
Another important point is the balanced utilization of Elasticsearch resources. In our tests, the queries were working stable up to a certain TP value, However, at higher TP values, CPUs’ usages were reaching up to 95–98% and the queries were starting to not return any response.
So to fix this problem, we discussed it with the SRE team and they increased the number of cluster nodes and their resources.
Using Aggregations
During the critical campaign event periods, the number of queries we needed to send to Elasticsearch was around 15–20M and we had to keep the TP value stable to maintain performance. And sending query requests one by one for each campaign and seller pair was an expensive process. The main cost here was due to the product related filters like category, brand, etc. that we were sending while querying. Because these filters were arrays and they contained too many elements in them.
For a campaign, the product filters used in the queries do not change except for the seller information. For this reason, we decided to put multiple seller information as filters and use the aggregation feature in order to obtain the eligible product count and stock quantity of multiple sellers in a single query.
In this way, we have enabled Elasticsearch to reduce the cost of document traversal and made it to calculate these values for more than one seller at once. For example, while 20 queries were sent for 20 sellers in the past, now only 1 query is being sent for 20 sellers thanks to the aggregation. After doing this, we reduced the response time per seller and throughput.
Improving Cache Usage
While researching aggregation queries, we discovered the preference parameter that enhances cache usage.
For faster responses, Elasticsearch caches the results of frequently run aggregations in the shard request cache. To get cached results, use the same
preference
string for each search.Elasticsearch routes searches with the same preference string to the same shards. If the shards’ data doesn’t change between searches, the shards return cached aggregation results.
The purpose of this parameter is to improve cache utilization by traversing nodes and shards in the same order for similar queries.
To use this feature, we created a hash for product filters’ values in the queries and used this hash as the preference parameter. As a result, although initial queries were running slowly, due to the constant preference hash value for the same filters, the subsequent queries became much more efficient by utilizing the cache.
Consuming Kafka Messages in Batch
After the improvements on the Elasticsearch side, we became capable of performing operations for multiple sellers in a single query. To achieve this, we also needed to process messages in batches on the Kafka side. We used the campaign id as the partition key value while producing events and thus collecting the same campaign events in the same partition.
On the consumer side, to ensure the efficient performance of batch consumers, it was crucial to adjust the number of messages fetched in a single poll which required modifying the max-poll-records configuration parameter. We adjusted this parameter to create a balanced TP for Elasticsearch in a single request.
Couchbase Bulk Writes
After all the optimizations, we resolved the performance issues during the data calculation process. However, this time, while writing the calculated data to Couchbase to cache the eligible product counts, we pushed the application and database resources to their limits due to high throughput and experienced performance problems during the writing operations. The reason for this was that we were individually writing millions of values to Couchbase, causing the ops/sec value to reach around our suggested maximum limit, which is approximately 50–60k, that our current database can handle. And we tried to find a different solution other than increasing the resources.
As a solution, since we already obtained multiple seller information in a single query, we decided to write all the seller data we obtained in the response in a single query. And we started to upsert these documents in bulk using N1QL. We used N1QL because bulk operations implemented in Java SDK and Spring Data Couchbase were actually executing multiple queries in parallel instead of single bulk queries
With this change, the writing operations became more efficient and we reduced the ops/sec value to around 5–10k.
Conclusion
In order to display the campaigns to the sellers, we needed to process the eligible product calculation events for millions of potential products to be added in a timely manner. However, due to the previous structure, processing these events took a significant amount of time, resulting in sellers seeing their campaigns late. This calculation process was taking too much time, even hours, during the critical event periods.
Thanks to all these changes, we improved the campaigns’ visibility latency. Today, all of our sellers can see their newly created campaigns in less than 5 minutes instead of hours of waiting time during important event periods like Black Friday, 11.11, etc. Also, we used to have millions of messages waiting to be processed that create lag on our Kafka topic because of the high requests and high processing times, but now we do not have this problem anymore.
Thanks for reading.
We’re always looking for passionate and talented individuals to join our team. Learn more and apply from the links below.