Hepsiburada Search Engine on Kubernetes
While looking for a way to to make Hepsiburada’s search engine scalable and highly available, the 1.5-month-old story evolved to the idea: “Why we don’t we run it on Kubernetes?”. I am sharing this article to explain our transition to this architecture, which took 3 months due to the planning, development and testing process.
Our search engine used to run on-premises on two active-active Elasticsearch clusters, in two datacenters. Manual scaling processes required a lot of effort as these environments were in stateful and non-scalable architecture, especially during special campaign periods such as Black Friday. In addition, we used to try to divide the traffic by creating additional Elasticsearch clusters on Google Cloud compute instances during such campaign periods. After all the Elasticsearch environments were scaled up during the campaign period, we had to scale them down after the campaign or remove the cloud instances. This became a cycle of campaign periods all year round. Preparing the infrastructure of the clusters, full and stream transitions of the indices were causing a lot of toil.
As a result of our research, we found out that one of the ways to set up a scalable Elasticsearch environment on Kubernetes is the Elastic Cloud on Kubernetes (ECK), however you have to be an enterprise Elastic customer. An other, opensource approach is the Elasticsearch operator, which is created and maintained by Zalando.
At first, it may sound crazy to think about running a stateful environment like Elasticsearch, with distributed and large amount of data of an e-commerce platform with more than 200 million monthly visitors, on Kubernetes. OK, let’s talk about how we managed to do this and how, after the tests were complete, we directed all of our search traffic overnight to our scalable Elasticsearch environment on Kubernetes, and how it has been running flawlessly for 3 months at this point.
In the case of Elastic Cloud on Kubernetes (ECK), if you are not going to use stateful autoscaling, you can always use it for free. What does stateful autoscaling mean? ECK is actually free and open-source, but if you want to scale Elasticsearch data nodes, an enterprise Elastic license is required. If you just want to move your Elasticsearch cluster to a Kubernetes environment without atuoscaling, it is suitable for you.
As HepsiTech, our main focus is open-source infrastructure architectures, so we turned ourselves towards the Zalando operator. On this project, I will be explaining the learning, testing and code development process of approximately 1.5 months, step by step, and the challenges we encountered.
1. Dockerize Elasticsearch
Actually, Elasticsearch is already compiled to run in a container, but the challenge for us was the installation of third-party plugins developed and used by our search teams on dockerized Elasticsearch and running. At this point, we had to test the created Docker images many times because we needed to make sure that the proccess, which runs in a packaged container, works perfectly together with the plugins.
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:7.8.0# Change elasticsearch config if necessary
COPY elasticsearch.yaml /usr/share/elasticsearch/config/# Add plugins for elasticsearch
COPY plugins /tmp/plugins# Install plugins
RUN /usr/share/elasticsearch/bin/elasticsearch-plugin install file:///tmp/plugins/xxxx.zip --batch# Add permissions for elasticsearch user in plugins dir
RUN chown -R elasticsearch:root /usr/share/elasticsearch/plugins/xxx
As we can see in the Dockerfile, we used an image with an oss tag. You should go with this tag, which stands for Open-Source Software, if you are not an enterprise license user.
2. Kubernetes Cluster Infrastructure
We created GKE clusters in two different zones to work active-active on Google Cloud. We added 3 node pools by making Kubernetes versions stable. Here are the properties of these three node pools:
* The first one is the e2-standard series that will run the Elasticsearch operator pod and other Kubernetes pods.
* The second one is the t2d-standard series to host Elasticsearch master pods
* Finally, the last one is the t2d-standard series node pools to host Elasticsearch data pods. Here, we used the new generation T2D series TAU instances, which offer higher performance at an affordable price for cost efficiency.
We made affinity and taint settings on Kubernetes so that the master and data pods are scheduled to the relevant node pools. In this way, by running Elasticsearch master and data pods in isolated node pools, we were able to make configuration changes such as scaling, firewall rules and metadata on these node pools, without affecting the others.
We configured each master and data pod to run on a single worker by giving pod anti affinity and we used pd-ssd storage class to persist Elasticsearch data of pods.
3. Elasticsearch Kubernetes Operator
I mentioned that we opted for the Zalando operator. However we forked the project and added lots of methods, since during our tests we concluded that the opensource version did not fully meet our needs. Actually, we wanted to contribute to the project at the beginning, but after a while we decided that we had to fork the project and modify it according to our wishes.
If we talk about the working logic of the Zalando operator; it is based on CPU median calculation as metric. When the CPU percentage we provide reaches a certain threashold, it does a shard per node ratio calculation according to the following inputs: max index replicas, min index replicas, min shards per node and max shards per node. The operator then decides on the desited node replica and starts the scale up&down process.
Let’s explain how the open-source Zalando operator manages the scaling according to the configs and its algorithm using an example, and then show the differences with the version we forked.
- Let’s say we have 4 indices and the indices are configured with 3 replica and 6 primary shards.
- We configure the operator as follows. While simulating the scaling process, I will talk about what each config actually means:
* minReplicas = 10, maxReplicas=80
* minIndexReplicas = 2, maxIndexReplicas=3
* minShardsPerNode=2, maxShardsPerNode=4
* scaleUpCPUBoundary: 40%, scaleDownCPUBoundary: %20
* scaleUpThresholdDurationSeconds: 900
* scaleDownThresholdDurationSeconds: 1800
* scaleUpCooldownSeconds: 1800
* scaleDownCooldownSeconds: 2700
- Initially, the initial desired replica is calculated by dividing the total number of shards by the maxShardsPerNode value.
4x3x6 => 72 shard / 4 (maxShardsPerNode) => 18 nodes - If the calculated median cpu reaches the scaleUpCPUBoundary value and remains at this value for as long as scaleUpThresholdDurationSeconds, the scale up process starts. The first thing the operator does here is to decrease the shards per node ratio from maxShardsPerNode to minShardPerNode. This ratio is first reduced from 4 to 3 and calculated from 72 shard / 3 to the new desired replica 24.
- The scale up process starts by changing the replica of the statefulset where the Elasticsearch dataset is located, and new pods -due to a pod one node rule we mentioned before- provide new worker nodes to be provisioned.
- When data nodes are scaled, Elasticsearch rebalance will activate and spread shards to empty nodes.
- Let’s say the load continues to come, the operator will wait until scaleUpCooldownSeconds before performing a new scaling operation, and then calculate the shard per node ratio again and perform the same operation as we mentioned before, until it hits the minShardsPerNode value. At this stage, it will keep the ratio constant and the index replica will start to increase. Indices are increased from 3 replicas to 4 and a new desired node replica 96 shard / 2 => 48 Node is calculated from 6 shard x 4 replica x 4 index. This is how the scale up operation takes place.
- On the other hand, if the median CPU value reaches the scaleDownCPUBoundary and stays at this value for the duration of the scaleDownThresholdDurationSeconds, the scale down operation follows a similar workflow unlike the scale up operation, and first of all increases the ratio up to the maxShardsPerNode value.
96 shard / 3=> 32 Node - Scale down operation drains -by relocating the shards on it- each node one by one until the new desired replica is calculated after the new desired replica is calculated.
- If the CPU median remains on the scaleDownCPUBoundary, it will wait until the scaleDownCooldownSeconds and calculate the ratio again. Assuming that it hits the minShardsPerNode value here, it will change index replicas up to minIndexReplicas this time.
The value, which was previously increased to 4 replicas during the scale-up process, was reduced to 3 this time; A new desire node replica 72 shard / 4 => 18 Node is calculated from 6 shard x 3 replica x 4 index. What should be noted here is that it hit the maxShardsPerNode (4) value in the scale down operation a few times before and now it has started to reduce index replicas.
To summarize, according to the specified thresholds and scaling configurations, when the operator calculates scaling direction between minShardsPerNode — maxShardsPerNode and the shard per node ratio hits any of them, this time it changes index replicas in the minIndexReplicas — maxIndexReplicas range, giving it up to the desired number of nodes and scaling operations. Scale down process is done by draining the nodes one by one and the same calculation is made for all indexes.
What did we change?
- First of all, we host one shard in each Elasticsearch data node due to the experience we have gained from our previous use and the structure of our search indexes. For this, the first thing that comes to mind is to set the minIndexReplicas and maxIndexReplicas values to 1, but it was not enough on its own and when the minIndexReplicas value was less than 2, we had to add an if block to the code.
- While calculating the scaling operations, a desired node was calculated over all the indexes. We added a config called by mainIndexAlias to the Operator CRD for our main index, which is used the most and must have one shard on one node.
- In our tests, we saw that this process takes a very long time because scale down is done by draining the data nodes one by one. According to our structure, our expectation was that the index would scale down as much as the number of shards. In this, we made the necessary improvements and made the nodes scale down&up to the number of shards, the main index referenced here comes from the mainIndexAlias config that we added with the development I just mentioned.
- Elasticsearch auto rebalance was kept active during the scale up process and an unnecessary shard relocation process was taking place during the node scaling process. We disabled it during the scaling operations.
Finally, with the development we have made, if there is a scaling process in the cluster, we present it from the API interface and enable us to shape our streams accordingly. In the opposite scenario, a calculation was also made on duplicate indexes formed during full and stream transitions.
You can reach the zalando operator version that we forked from the Github page of Hepsiburada.
4. Observability
We deployed Filebeat and Logstash to the Kubernetes cluster to collect operator logs. In order to monitor the status of Elasticsearch clusters and to generate alerts according to certain conditions, we deployed the Elasticsearch exporter and created a dashboard like the one below with Prometheus & Grafana duo. You can find the Grafana export here.
Result
We have transformed the Hepsiburada search engine from a structure that used to consist of fixed number of VM instances on two active-active datacenters, to a modern structure that can automatically scale on a multi-zone Google Cloud Kubernetes cluster. With this work, we have achieved a cost efficiency of up to 60%. We have drastically reduced the toil of setup and anministration of Elasticsearch. It has been a beautiful journey of 4.5 months for us, which includes a 1.5 month development and testing period and a 3 month production usage period, and we wanted to share it with you.
Thanks
First of all, to my colleague Ziya Özçelik during this whole process.
To Murat Babacan and Kemal Erişen, who made this project environment possible for us,
Thanks to Yağız Demirsoy for his unwavering support with his determined stance during the weeks-long test and development process.