How to reindex over 120M documents in one hour at Compass

Cui Xiao
Compass True North
Published in
6 min readJan 22, 2020

At Compass, it has become essential to reindex our listings catalog (storage abstraction for real estate data at Compass) from scratch from day one due to certain data level complexities — normalization, merge logic updates, etc. — as well as new search features developed for our clients. As our listings catalog vastly expanded, reindexing could take over 8 hours, becoming the bottleneck for us to move fast with new feature rollouts. It was also a significant burden on our on-call engineers to make sure everything was rolled out correctly.

In this post, I would like to share the journey we have gone through addressing bottlenecks in our indexing stack with AWS MSK and AWS ElasticSearch.

Improving Indexing Infrastructure

Indexing Stack Diagram

We use MSK (AWS Managed Kafka) topic between Indexer and Catalog services for listing updates. Every time there’s an update on the data side, the producer is responsible for fetching the well normalized/merged listing to index the topic for the indexer to consume. There are three main benefits of changing from indexer pulling from Catalog service directly:

  1. Minimize the time wasted on I/O between indexer and catalog service.
  2. Reduce the load on the catalog service-side. Since Kafka is also being used to back document storage, we can reindex a new cluster from scratch by simply resetting consumer offset to replay all the messages in our indexing topic.
  3. Easy to scale on the indexer side. Based on the simplicity of the configuration of partitions that come along with Kafka, it becomes straightforward to scale out the indexer cluster.

Indexing infrastructure upgrade unleashed the shackle on the internal side to publish updates to ElasticSearch, with abilities to add more parallelism on indexer side as well as scale out indexer servers.

Improve Parallelism on indexer

There will be always bottlenecks in the system, applies for the one mentioned above. It is true that we are in a better position to increase partitions in our Kafka topic so that we can scale out of indexing cluster to achieve better overall indexing performance, on the other hand, we also want to achieve better throughput on a single consumer without limited by the nature of partition numbers.

Indexing document itself is now our new bottleneck in each indexer, due to nature of applying business logic for each document plus external network call to ElasticSearch. In order to maintain the nice feature Kafka provides:

Each partition is an ordered, immutable sequence of records that is continually appended to — a structured commit log.

and at the same time unleash the computation power to index, we went with following approach:

Internal Indexer Job Design

With applying similar hashing mechanism defined in Kafka to N indexing threads, we are able to improve the throughput from indexing threads and at the same time, with messages order guaranteed.

Tuning ElasticSearch

Now let’s move to how we tune ElasticSearch to achieve even faster indexing speed. There are three main aspects to achieve fast indexing with ElasticSearch: cluster-level settings,

client-side changes, and hardware upgrades. Here, we will focus on the first two, where you should expect to see a decent reindexing performance boost without additional a hardware upgrade.

Cluster level settings

Translog

Translog are changes to Lucene that are only persisted to disk during a Lucene commit, which is a relatively expensive operation and so cannot be performed after every index or delete operation. Changes that happen after one commit and before another will be removed from the index by Lucene in the event of process exit or hardware failure.

https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-translog.html

Note: The data in the translog is only persisted to disk when the translog is fsynced and committed. In the event of a hardware failure, an operating system crash, a JVM crash, or a shard failure, any data written since the previous translog commit will be lost.

For our use case, during the reindexing, we usually don’t need this guarantee and could simply restart the process in the case of a hardware failure. Therefore, by setting index.translog.durability to async, both the fsync and the Lucene commit will now occur in the background, thereby making the reindexing a lot faster. Remember to set the durability back to the default request once the bulk indexing is done. You can also enhance the flush threshold by changing translog.flush_threshold_size. In our setup, we’re using 2GB. The default value is 512MB.

Refresh Interval

The operation that consists of making changes visible to search — called a refresh — is costly, and calling it often while there is ongoing indexing activity can hurt indexing speed. By default, Elasticsearch periodically refreshes indices every second, but only on indices that have received one search request or more in the last 30 seconds.

This is the optimal configuration if you have no or very little search traffic (e.g. less than one search request every 5 minutes) and want to optimize for indexing speed. This behavior aims to automatically optimize bulk indexing in the default case when no searches are performed. In order to opt-out of this behavior, set the refresh interval explicitly.

In our case, there’s no traffic to the cluster during reindexing. This allowed us to turn off refresh by setting it to -1 and once the reindex is finished, we change it back to the default value.

Replicas

Replicas are useful for improving search speed; on the other hand, it brings additional burden for indexing since every indexing request needs to be synced on each replica. For a reindexing scenario, it’s fine to set num_of_replicas to 0 and change it back once reindexing finished.

Tuning Refresh internal and Replicas during heavy indexing time is relative standard base on following ElasticSearch recommendation:

https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html

Translog tuning, on the other hand, was completed on our side recently with significant indexing rate boost, from 40K AVG to 80K AVG (Indexing rate: Number of indexing operation per min)

Client level changes

Bulk API

We use the bulk API to send concurrent requests to the ElasticSearch cluster and achieve high throughput. Since our client library does not support bulk by size (MB, KB etc.), we have to spend a decent amount of time to find out our sweet spot in terms of the number of documents, as long as the cluster can handle them properly.

Monitoring

Indexing rate metric in AWS ElasticSearch is the key metric for us to see if we are getting better indexing performance or not. For example:

Indexing rate without Translog Tuning
Indexing rate with Translog Tuning

CloudWatch EBS metric is another crucial metric to understand your cluster usage with disk operations; it is the key metric for you to decide what kind of EBS you want to go with (gp2/io1) and how much PIOPs you want to get.

Summary

With the improvements mentioned above, we were able to bring down our reindexing time from 8 hours to 1 hour, but our journey doesn’t stop here. We will keep improving our indexing stack to make indexing as fast as possible, to power more search features with low latency.

Bonus material

Our CTO, Joseph Sirosh, recently gave a talk at AWS re:Invent on how we use AWS ElasticSearch to power Compass Search with more details on our mission and challenges in our search stack!

https://www.youtube.com/watch?time_continue=2871&v=V8yaocQZpr0&feature=emb_title

--

--