Featured
Optimizing Elasticsearch pagination with IAsyncEnumerable
I’ve been using Elasticsearch for a long time in my .NET applications. As a powerful NoSQL technology, Elasticsearch offers unmatched performance, making it an excellent choice for scalable data retrieval.
For a client who implements an important document management platform, I was tasked with efficiently retrieving documents from an Elasticsearch instance and doing so in the most efficient way possible. Another key requirement of the request was to execute the query in a paginated manner because it is not possible to determine, at the time of the request, the number of documents needed. The requester must be able to continue retrieving documents incrementally.
Paginated requests: search after and point in time
In Elasticsearch, two specific techniques are used to perform paginated searches correctly. The first is called search_after
and is used to execute paginated searches and retrieve the next page. Unfortunately, this technique may lead to "inconsistent" searches if new documents are indexed during pagination.
To prevent these inconsistency issues during pagination, another concept is introduced: Point in Time (PIT).
The PIT (Point in Time) makes it possible to preserve the state of the index during query execution.
Here is a simple example of using PIT with C# and the Elasticsearch.Net library:
public async Task<IEnumerable<MyDocument>> GetDocumentsAsync(CancellationToken cancellationToken = default)
{
// Create a new pit
var crationPitResult = await this._client.OpenPointInTimeAsync(_indexName, p => p.KeepAlive(_keepAlive), cancellationToken);
var lastPitId = crationPitResult.Id;
List<MyDocument> docs = [];
ICollection<FieldValue> searchAfter = null;
while (true)
{
var searchResponse = await _client.SearchAsync<MyDocument>(sd => sd.Index(_indexName)
.Size(_maxResult)
.TrackTotalHits(new TrackHits(false))
.Pit(new PointInTimeReferenceDescriptor(lastPitId))
.SearchAfter(searchAfter)
.Sort(sd => sd.Field(document => document.Id)), cancellationToken);
if (searchResponse.Hits.Count == 0)
{
break;
}
// Add documents to whole array
docs.AddRange(searchResponse.Documents);
searchAfter = searchResponse.HitsMetadata.Hits.Last().Sort.ToList();
// The open point in time request and each subsequent search request can return different id;
// thus always use the most recently received id for the next search request.
lastPitId = searchResponse.PitId;
}
// Delete pit
await _client.ClosePointInTimeAsync(v => new ClosePointInTimeRequest { Id = lastPitId }, cancellationToken);
return docs;
}
This paginated approach ensures that we avoid loading all the documents from the index into memory at once. Instead, we retrieve documents incrementally, which is especially useful for responding to an API that displays results in segments.
Paginated requests with IAsyncEnumerable
Since we don’t know in advance how many documents might be requested by the interface, the typical paginated search alone doesn’t fully meet our needs. This is where the IAsyncEnumerable
interface in C# can be incredibly helpful.
IAsyncEnumerable
is an interface introduced in C# 8.0 that represents an asynchronous enumerable sequence of items. It is used when you need to work with a collection of data that is retrieved asynchronously, such as streaming data from a database, API, or file.
In this case, we can leverage Elasticsearch pagination with an asynchronous enumerable in C#:
public async IAsyncEnumerable<MyDocument> GetDocumentsAsyncEnumerableAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Create a new pit
var crationPitResult = await this._client.OpenPointInTimeAsync(_indexName, p => p.KeepAlive(_keepAlive), cancellationToken);
var lastPitId = crationPitResult.Id;
ICollection<FieldValue> searchAfter = null;
try
{
while (true)
{
var searchResponse = await _client.SearchAsync<MyDocument>(sd => sd.Index(_indexName)
.Size(_maxResult)
.TrackTotalHits(new TrackHits(false))
.Pit(new PointInTimeReferenceDescriptor(lastPitId))
.SearchAfter(searchAfter)
.Sort(sd => sd.Field(document => document.Id)), cancellationToken);
if (searchResponse.Hits.Count == 0)
{
break;
}
searchAfter = searchResponse.HitsMetadata.Hits.Last().Sort.ToList();
// The open point in time request and each subsequent search request can return different id;
// thus always use the most recently received id for the next search request.
lastPitId = searchResponse.PitId;
foreach (var doc in searchResponse.Documents)
{
cancellationToken.ThrowIfCancellationRequested();
yield return doc;
}
}
}
finally
{
// Delete pit
await _client.ClosePointInTimeAsync(v => new ClosePointInTimeRequest { Id = lastPitId }, cancellationToken);
}
}
In this implementation, the yield return
keyword is used to return documents one by one, effectively streaming the results as they’re retrieved. This method allows the calling code to process the documents asynchronously without having to load all of them into memory at once.
To consume the results, the client can simply iterate over the IAsyncEnumerable
:
await foreach (var document in _myIndexRepository.GetDocumentsAsyncEnumerableAsync())
{
.....
}
When the results of the first paginated search are exhausted, our method will automatically use the PIT to request a new page from Elasticsearch.
P.S.: If the client interrupts the enumeration in the foreach
loop, the finally
block ensures that the PIT is properly closed.
Conclusions
By combining Elasticsearch’s paginated search with C#’s IAsyncEnumerable
, we’ve been able to efficiently control document retrieval from the UI without keeping a large number of documents in memory. This approach helps optimize resource usage and enables seamless, incremental document retrieval for a more responsive user interface.
Next step include enhancing the API controller and leveraging NDJSON to stream documents back to the frontend, ensuring an even more responsive UI.
P.S.: This article continues with NDJson and IAsyncEnumerable.
All the code, including integration tests, is available on the GitHub project.
Please, Subscribe and Clap! Grazie!