Evolution of Our Invalidation System

Arif Yılmaz
Trendyol Tech
Published in
6 min readJan 18, 2022

As Trendyol Indexing Team, we are mainly responsible to apply any changes including price, stock, promotion etc. to the content as soon as possible. We have named these changes as invalidations and we described the invalidation process here. In this article, I will dive into Content Invalidation Api problems and how we solved them, but before I strongly recommend you to read this article if you have not read it.

Timeout Problem

Content Invalidation Api (CIA) basically gets an invalidation request and generates an Elasticsearch query from this request. This query gives the contents that should be updated. Running an Elasticsearch query may take long time and this may cause timeout errors in the APIs that send invalidation request to CIA. We should had somehow prevent this errors. We have solved this problem by enqueuing request in memory and returning HTTP OK immediately.

Terminating Process Problem

This approach solved the timeout problem but it caused a different problem. When a CIA process terminates, requests which are currently in this process are not properly processed. It means that requests are lost forever. This was also an important problem, because we could have display wrong information about a content and this may have mislead our users.

This time solution was not tricky. We decided to separate our handling process and storing requests. In this scenario, CIA is responsible to just send incoming invalidation request to a Kafka topic. We introduced a new service Content Invalidation Dispatcher (CID) that is responsible to read invalidation requests from this Kafka topic and process these requests. This approach solved the terminating process problem, because a terminating process does not send acknowledgement to Kafka and requests that are in this terminating process will be read from one of the other precesses properly.

Content Invalidation Dispatcher with Kafka topic

Kafka Poll Interval Problem

One of the most important Kafka consumer config for our application is max.poll.interval.ms. Kafka documentation explains this config as follows:

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.

The default value of this config is 300000 ms (5 min) and we used its default value. Some invalidation queries may result in millions of contents to be updated. It means that we need to fetch millions of documents from Elasticsearch and it may take more than our poll interval (5 minutes) and cause rebalancing in our Kafka topic. Unfortunately, rebalancing causes bigger problem because this heavy invalidation request will be processed again and again and we will not be able to process remaining requests in the same Kafka partition.

For a quick solution, we decide to change queue strategy to RabbitMQ. This gives us a temporary solution. But we could able to fight our main problem. RabbitMQ does not work like Kafka. It does not have partitions. A RabbitMQ consumer may process a message in a long time and other consumers can continue to process other messages. We replace Kafka topics with RabbitMQ queues and this really solved this problem.

Long Running Elasticsearch Queries

Content Invalidation Dispatcher with RabbitMQ queues solution solved most of our problems. However, we still have long running queries. This queries indirectly affect other invalidation request and we have solved it programatically. Firstly, we used Elasticsearch scroll api to implement pagination in CID. However, Elasticsearch documentation does not recommend scroll api, if we are paging more than 10.000 hits. As I said before, we occasionally scroll millions of hits and scroll api is not suitable for these cases. Additionally, deep pagination causes increase in Elasticsearch search latency and this indirectly affects other invalidation requests.

In order to solve this problem, we should have split queries in a way that the result of the queries have approximately 10.000 hits. We defined a new field searchPartition to our Elasticsearch mapping and set its value by using modularity of ID that spread our contents. (ID % 1000) This assures that each document will have a searchPartition value between 0 and 999. We used searchPartition field to split queries. Our split algorithm can be explained briefly as follows:

  1. If the total number is greater than 10.000 in the query result, do not continue to invalidation process.
  2. Run an aggregation query over searchPartition field with size 1000 and order key.
  3. Group aggregation bucket in a way that each group should have approximately 10.000 elements.
  4. Add range filter over searchPartition field to the original query for each group and send it to our partition RabbitMQ queue.
  5. Read these partitioned requests and apply them properly.

Lets give a toy example:

Assume that we have following query:

{
"query": {
"match": {
"category.id": "100"
}
}
}

The total value is 20.000 for this query. Now, we will run this aggregation query:

{
"query": {
"match": {
"category.id": "100"
}
},
"aggs": {
"searchPartitionAggs": {
"terms": {
"field": "searchPartition",
"size": 1000,
"order": {
"_term": "asc"
}
}
}
}
}

Let’s say the aggregation result is as following:

"buckets" : [
{
"key" : 0,
"doc_count" : 2600
},
{
"key" : 1,
"doc_count" : 7000
},
{
"key" : 2,
"doc_count" : 500
},
{
"key" : 3,
"doc_count" : 1000
},
{
"key" : 4,
"doc_count" : 8900
}
]

Now, we group these buckets:

group1 => keys: 0, 1, 2 and total count:10.100
group2 => keys: 3, 4 and total count: 9.900

The partitioned queries are now as follows:

{
"query": {
"bool": {
"must": [
{
"match": {
"category.id": "100"
}
},
{
"range": {
"searchPartition": {
"gte": 0,
"lte": 2
}
}
}
]
}
}
}
{
"query": {
"bool": {
"must": [
{
"match": {
"category.id": "100"
}
},
{
"range": {
"searchPartition": {
"gte": 3,
"lte": 4
}
}
}
]
}
}
}

As you can see, original query is split into two queries and these queries will hit 10.000 documents which is recommended limit for scroll api.

What is Next?

At this point, we have actually solved all problems. However, we have other considerations. As Trendyol tech team, we prefer using Kafka topics rather than RabbitMQ queues because of our future multi data center scenarios. We are planning to use active-active Kafka clusters in different data centers. Firstly we will give a chance to CID with Kafka topics solution since we solved long running Elasticsearch query problem with searchPartition strategy.

Elasticsearch documentation recommends search_after parameter for pagination with point in time api (PIT). We will use this parameter and api in the implementation of CID. Currently, we are using Elasticsearch version 6.8.6. However this version does not support PIT feature. We will also upgrade our Elasticsearch clusters to version 7.

Conclusion

In this article, I tried to explain the problem we faced in our invalidation environment and our step by step solutions. Our company grows fast and we always face new problems. We sometimes solved them by writing some code and sometimes by changing the system. Each problem gives us new experiences and we want to share our experiences with the community.

Thanks for reading :)

--

--