Elasticsearch: How Reads, Writes and Search work in the Cluster

Taras Ivantsiv
Wix Engineering
Published in
9 min readAug 17, 2021
Photo by Dan-Cristian Pădureț on Unsplash

Elasticsearch is a distributed search and analytics engine built on top of Apache Lucene. It provides a simple REST API for indexing and searching that hides all the complexities of the search engine and the distributed nature of it.

Recently, our team has started using Elasticsearch in our project, so I decided to examine how the basic operations with documents and searching actually work under the hood to better understand this system. In this post I want to give you an overview on this topic that, I hope, is going to help you design your applications that use Elasticsearch better, and avoid some subtle bugs and mistakes when using this engine.

Cluster

Elasticsearch uses sharding (also called partitioning) to achieve scalability, which basically means that the data is split into parts and stored separately. Therefore, we can put the shards on multiple machines that will be handling the requests simultaneously, which improves the overall performance. Additionally, we can add more machines if the load is increased or vice-versa. In Elasticsearch the data is stored as collections of schema-less JSON documents that are called indexes. The data in those indexes is split into primary shards in a way that every document in an index belongs exactly to a single primary shard.

Another technique that Elasticsearch uses is called replication — it means that we store two or more copies of the same data separately. The replicas should also live on multiple machines, so we always have a backup copy of the data if something goes wrong with one machine, and the replicas can serve read requests together that helps spreading the load on the system. The replicas in Elasticsearch are called replica shards, and they are just copies of primary shards.

The shards are distributed across multiple nodes in the cluster in a way that a primary and a replica shard storing the same data shouldn’t belong to the same node.

PUT /films
Content-Type: application/json

{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
}
}

Creates an index called films with 3 shards and 2 replicas.

This configuration means that the index’s data will be split across 3 primary shards, and each primary shard will have 2 copies in replica shards. A cluster with 3 nodes can look like this:

PS — primary shard, RS — replica shard

CRUD operations

Retrieving a document

GET /films/_doc/1

Retrieves a document with an id of 1.

Let’s try to understand what’s happening here behind the scenes. Each document belongs to a single primary shard, so we can find it there or on its replicas. First of all, Elasticsearch needs to determine the primary shard by the document’s id. As the number of primary shards for an index can’t be changed, the following formula is used:

primary_shard = hash(id) % number_of_primary_shards

In general case, the primary shard is determined by the _routing parameter that is passed to the request, but it’s equal to the document’s id by default.

So, the algorithm for retrieving a document by its id is:

  1. The client sends a request to a node.
  2. The node accepts the request (any node in the cluster can accept requests), determines the primary shard, and redirects the request to a node containing it or its replica.
  3. The node waits for the response and returns it to the client.

Write operations

PUT /films/_doc/1
Content-Type: application/json

{
"title": "Mulholland Drive",
"director": "David Lynch"
}

Creates or updates a document with id 1.

DELETE /films/_doc/1

Deletes the document with id 1.

Also, the Elasticsearch API allows you to make ‘create if not exists’ requests, partial update requests, and more (check the details in the reference).

All those requests are write operations, so they are required to be executed only on the primary shard. This ensures that the order of operations is preserved for each document. When the execution is successful, such requests will be replicated to the replica shards.

The algorithm here is following:

  1. The client sends a request to a node.
  2. The node accepts the request, determines the primary shard, and forwards the request to a corresponding node.
  3. The primary shard validates and executes the request. If it’s successful, the request is sent to all active replica shards in parallel.
  4. When the replica shards have successfully executed the request, the response acknowledging the completion is sent to the client.

There’s one subtle thing to be aware of in the algorithm above. The request is replicated only to the active replica shards (the list is maintained by the primary shard), and it can happen that there’re no active shards except the primary one. To protect yourself from this scenario, you can use the wait_for_active_shards request parameter (which is equal to 1 by default). This parameter configures the primary shard to wait until there’re the required number of active replica shards before proceeding with the request. Still, this doesn’t guarantee that the operation will be replicated, because the active replica shards can fail after the check, but it lowers the probability of such situation.

Optimistic concurrency control

To ensure that the older version of a document never overwrites a newer version in case of concurrent updates to the document, Elasticsearch provides an optimistic concurrency control mechanism that uses the _seq_no and _primary_term system fields. They are returned in every response to create, update, and delete requests:

PUT /films/_doc/1HTTP/1.1 201 Created
Location: /films/_doc/1
content-type: application/json; charset=UTF-8
{
"_index": "films",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 3,
"successful": 3,
"failed": 0
},
"_seq_no": 3,
"_primary_term": 1
}

When sending a subsequent write operation request, you should include the sequence number and primary term as parameters (if_seq_no and if_primary_term). Elasticsearch will be able to check if you aren’t overwriting a newer version of the document using those values.

PUT /films/_doc/1?if_seq_no=3&if_primary_term=1
Content-Type: application/json

{
"title": "Mulholland Drive 2",
"director": "David Lynch"
}

You will get a HTTP/1.1 409 Conflict response if there were conflicting concurrent updates.

Searching

POST /films/_search
Content-Type: application/json

{
"query": {
"match": {
"title": "Drive"
}
},
"from": 0,
"size": 10
}

Searches for documents with the title field matching ‘Drive’ in the films index. The from and size parameters are used for pagination.

I’m not going to go into the details of the Query DSL, analyzers, and mapping, as it’s a completely different topic that requires a separate post. I want to explain you more about how the search is executed in the distributed cluster.

When retrieving a document by its id, we know which shard the document belongs to beforehand. In case of searching, it’s more complicated, because the matching documents can be stored in any of the shards. To execute a search request, Elasticsearch needs to query all the shards and combine their results into a sorted list, then return a page of results from it.

The execution is divided in two parts: a query phase and a fetch phase. During the query phase Elasticsearch operates only with the minimally required fields (ids and fields needed for sorting) to reduce the amount of data to transfer between the nodes:

  1. A client sends a search request to a node.
  2. The node accepts the request and forwards it to every primary or replica shard of the index.
  3. Each shard executes the request locally and forms a sorted list of results that has the size of from + size.
  4. The coordinating node (the one accepted the request) waits for the results from the shards and merges them into a single list.

The query phase identifies only the ids of the matching documents. The next step (called the fetch phase) actually retrieves the data and returns it to the client. It works similarly to the retrieval of a document by its id that was described earlier.

Deep pagination

Usually, when searching, users are interested only in the first few pages of the results. The algorithm above works well in that use case, but when you need to list all the results or you need to get the result from a ‘deep’ page (when from + size is a large number), it starts to consume a lot of resources. Each shard needs to store a list of matching documents of size from + size in the memory, then send it through the network to the coordinating node that merges the lists into one. So, if you need to page through a large number of results, it’s better to use the search_after parameter.

To demonstrate how to use this parameter, let’s try to search the films sorted by their rating:

POST films/_search
Content-Type: application/json

{
"query": {
"match_all": {}
},
"sort": [
{"rating": {"order": "desc"}}
],
"size": 5
}

The response is going to contain the top 5 films:

{
...
"hits": {
...
"hits": [
{
"_index": "films",
"_type": "_doc",
"_id": "6f8816f4-4277-4719-94d1-5fb5a9d66bd8",
"_score": null,
"_source": {
"title": "The Shawshank Redemption",
"director": "Frank Darabont",
"rating": 9.3
},
"sort": [
9.3
]
},
...
{
"_index": "films",
"_type": "_doc",
"_id": "947218a5-f609-414c-8928-58d72614ffe3",
"_score": null,
"_source": {
"title": "The Godfather: Part II",
"director": "Francis Ford Coppola",
"rating": 9.0
},
"sort": [
9.0
]
}
]
}
}

We can take the rating from the last film from the results (9.0) and search for films with a lower rating, using the search_after parameter, to get the next page of results:

POST /films/_search
Content-Type: application/json

{
"query": {
"match_all": {}
},
"sort": [
{"rating": {"order": "desc"}}
],
"search_after": [
9.0
],
"size": 5
}

This is going to be more efficient as each shard will need to store only a list of matching documents of size equal to size (instead of from + size).

But, there’s one issue in this example — there can be many films with rating 9.0, and the search_after query skips all of them, which means that you will miss some films. To solve this you need to include some unique field into the sorting (e.g., _id or _shard_doc, which was introduced recently and is more performant):

"sort": [
{"rating": {"order": "desc"}},
{"_id": {"order": "asc"}}
]

And pass the unique field’s value to the search_after parameter along with the rating:

"search_after": [
9.0,
"947218a5-f609-414c-8928-58d72614ffe3"
]

Translog

Building a Lucene index is an expensive operation, and that’s why Elasticsearch uses a write-ahead log called translog to make the indexing fast and durable.

  1. Every write operation is appended to the translog on the disk and the new documents or document versions are added to the in-memory buffer.
  2. Periodically (every second by default) the index is refreshed, when the in-memory buffer is cleared, and a Lucene segment is built and stored in the filesystem’s cache (not yet persisted on the disk). After the refresh, the data from the buffer becomes searchable.
  3. Also, periodically, the index is flushed, when the data from the filesystem’s cache is actually written to the disk. The new translog is created after the flush.

After a failure, the node gets restarted and all the unflushed operations are restored in the memory from the translog. It means that the data won’t get lost even if it hasn’t been written to the disk.

The index refresh happens every second, which means that the search requests always lag behind the actual state of the documents. But, the retrieval of a document by id checks the translog first, so you will be able to get the up-to-date version.

Conclusions

In this post I tried to explain how Elasticsearch executes read, write and search requests in the cluster. Also, I covered the optimistic concurrency control, described a bit about the translog and how to avoid problems like deep pagination. If you are interested in distributed systems and Elasticsearch, there’re many more topics to learn about: the leader election, node discovery, failure handling, and more. As well, there’re many interesting and complex topics related to the full-text search and analytics that I didn’t even touch.

It’s a good idea to check the official reference yourself, because Elasticsearch is evolving really fast and the details might change in the future.

--

--