How you can reduce the time taken by updates in Elasticsearch from 6 hours to 34 mins
We had a business requirement of loading ~13 million records (each record could have up to 250 attributes), into a datastore within a limited time-period. The requirement was also to search, filter, aggregate and retrieve records from this data-store within milliseconds, support pagination and serve ~50 concurrent user requests in parallel
Did we have other options?
Yes, we evaluated other options such as PostgreSQL. Why we choose Elasticsearch over Postgres is a post in itself.
tl;dr Elasticsearch provides horizontal scaling and a good out-of-the-box read performance on large data sets
Why Elasticsearch?
Our Elasticsearch cluster consisted of 5 nodes. Each node had the following configuration:
- AWS Instance type: M4.2xlarge
- RAM : 32GB and JVM options as 16 GB (50% & < 32GB) heap size
- Cores: 8 cores
- Hard-Disk: 500GB
With this configuration, we were able to load the data (stored as nested datatypes) in ~50 mins.
By default, an update operation on Elasticsearch will merge the old document with the new document and keep the old key value pairs within the new document.
For example, the following was my original document:
To modify the Address field of this document, the HTTP request body is the following:
The resulting document will be the following :
The attribute, “House No”, is retained, which is not a desirable result.
In our real-world scenario, there were many such key value pairs, which were getting retained in an update operation, which in-turn was increasing the document size and was also showing up on the UI.
We were doing this updates using the bulk api of Elasticsearch
We tried a few things to solve this:
Solution 1 — Scripted updates : We used scripted updates in the update query.
This ensured that the attribute is deleted if not present in the update payload. On the flip-side, our data-load process took more than 6 Hours to complete.
Was this acceptable ?
Solution 2 — Delete the nested field and then update:
Perform a delete attribute using update_by_query and then doing a normal update.
This brought the time take from 6 hours down to 2 hours 30 minutes
Sounds good?
Solution 3: Set attributes to null
We tried setting the to-be-deleted attributes as nulls.
While this worked, using CURL / Postman calls the scala library we used for JSON (to_json) , ignored the key if it HAD null values. We wrote our own JSON generator library which retained the nulls
This brought the time take down to ~50 minutes
Good enough?
Solution 4 — Turn off Replication in addition to using a custom JSON generator:
By setting replication factor to -1, we managed to reduce the time even further to 30 minutes. But, this was a risky move as we would have been taking a big risk of data loss, if the primary node were to go down
Enter my Advance-Update Elasticsearch Plugin
A plugin that removes the old fields and add the new fields. Now we can remove the slow json generator and use Scala’s faster to_json library. Before looking at what this plugin is, let’s look at what happens when you update a document.
- The request goes to the data node 1. All nodes in the cluster have the capability of handling any HTTP request.
- The node then tries to determine which shard does this requested document belong to through the document ID. This is the default routing logic which Elasticsearch uses; it uses the _id field and a consistent hashing algorithm to determine the shard. The node:
- Processes the provided request and extracts out all the data in the request
- Validates the request JSON body
- Checks for all the Indices which might be required for execution
- Creates all the Indices which are not present
- Loads both the old document and new document in memory, appends the new document columns to the old
- This new document is then added to the request which is then sent to Lucene for storage
3. If everything goes fine, Elasticsearch creates requests for all the replicas and send those requests in parallel.
4. Once the document is copied to all the replicas, the success response is sent by the Master Node to the Node which received the request.
5. The node then sends back a success response.
In this process the new keys gets added or updated in the existing document. But old keys which are not present in the new document do not get removed. This was a problem for us.
So I rewrote this logic to create a new document by comparing both the documents’ leaf level fields in the plugin. This ensures that the old leaf level keys, which are not present in the new document are removed from the document.
Time Taken using my plugin ~34 mins.
There’s still more that I plan to do to improve this even further. My immediate TODO list:
- Send update request when there is a change in the document
- Update the plugin to support controlling the copying data to replicas
Plugin Github URL: Advance-Update