We-Code Digital Studio

Software Development

Featured

Optimizing Elasticsearch pagination with IAsyncEnumerable

--

I’ve been using Elasticsearch for a long time in my .NET applications. As a powerful NoSQL technology, Elasticsearch offers unmatched performance, making it an excellent choice for scalable data retrieval.

Paginated results that are too old

For a client who implements an important document management platform, I was tasked with efficiently retrieving documents from an Elasticsearch instance and doing so in the most efficient way possible. Another key requirement of the request was to execute the query in a paginated manner because it is not possible to determine, at the time of the request, the number of documents needed. The requester must be able to continue retrieving documents incrementally.

Paginated requests: search after and point in time

In Elasticsearch, two specific techniques are used to perform paginated searches correctly. The first is called search_after and is used to execute paginated searches and retrieve the next page. Unfortunately, this technique may lead to "inconsistent" searches if new documents are indexed during pagination.

To prevent these inconsistency issues during pagination, another concept is introduced: Point in Time (PIT).

The PIT (Point in Time) makes it possible to preserve the state of the index during query execution.

Here is a simple example of using PIT with C# and the Elasticsearch.Net library:

public async Task<IEnumerable<MyDocument>> GetDocumentsAsync(CancellationToken cancellationToken = default)
{
// Create a new pit
var crationPitResult = await this._client.OpenPointInTimeAsync(_indexName, p => p.KeepAlive(_keepAlive), cancellationToken);

var lastPitId = crationPitResult.Id;

List<MyDocument> docs = [];
ICollection<FieldValue> searchAfter = null;

while (true)
{
var searchResponse = await _client.SearchAsync<MyDocument>(sd => sd.Index(_indexName)
.Size(_maxResult)
.TrackTotalHits(new TrackHits(false))
.Pit(new PointInTimeReferenceDescriptor(lastPitId))
.SearchAfter(searchAfter)
.Sort(sd => sd.Field(document => document.Id)), cancellationToken);

if (searchResponse.Hits.Count == 0)
{
break;
}

// Add documents to whole array
docs.AddRange(searchResponse.Documents);
searchAfter = searchResponse.HitsMetadata.Hits.Last().Sort.ToList();

// The open point in time request and each subsequent search request can return different id;
// thus always use the most recently received id for the next search request.
lastPitId = searchResponse.PitId;
}

// Delete pit
await _client.ClosePointInTimeAsync(v => new ClosePointInTimeRequest { Id = lastPitId }, cancellationToken);
return docs;
}

This paginated approach ensures that we avoid loading all the documents from the index into memory at once. Instead, we retrieve documents incrementally, which is especially useful for responding to an API that displays results in segments.

Paginated requests with IAsyncEnumerable

Since we don’t know in advance how many documents might be requested by the interface, the typical paginated search alone doesn’t fully meet our needs. This is where the IAsyncEnumerable interface in C# can be incredibly helpful.

IAsyncEnumerable is an interface introduced in C# 8.0 that represents an asynchronous enumerable sequence of items. It is used when you need to work with a collection of data that is retrieved asynchronously, such as streaming data from a database, API, or file.

In this case, we can leverage Elasticsearch pagination with an asynchronous enumerable in C#:

public async IAsyncEnumerable<MyDocument> GetDocumentsAsyncEnumerableAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Create a new pit
var crationPitResult = await this._client.OpenPointInTimeAsync(_indexName, p => p.KeepAlive(_keepAlive), cancellationToken);

var lastPitId = crationPitResult.Id;
ICollection<FieldValue> searchAfter = null;

try
{
while (true)
{
var searchResponse = await _client.SearchAsync<MyDocument>(sd => sd.Index(_indexName)
.Size(_maxResult)
.TrackTotalHits(new TrackHits(false))
.Pit(new PointInTimeReferenceDescriptor(lastPitId))
.SearchAfter(searchAfter)
.Sort(sd => sd.Field(document => document.Id)), cancellationToken);

if (searchResponse.Hits.Count == 0)
{
break;
}

searchAfter = searchResponse.HitsMetadata.Hits.Last().Sort.ToList();
// The open point in time request and each subsequent search request can return different id;
// thus always use the most recently received id for the next search request.
lastPitId = searchResponse.PitId;

foreach (var doc in searchResponse.Documents)
{
cancellationToken.ThrowIfCancellationRequested();
yield return doc;
}
}
}
finally
{
// Delete pit
await _client.ClosePointInTimeAsync(v => new ClosePointInTimeRequest { Id = lastPitId }, cancellationToken);
}
}

In this implementation, the yield return keyword is used to return documents one by one, effectively streaming the results as they’re retrieved. This method allows the calling code to process the documents asynchronously without having to load all of them into memory at once.

To consume the results, the client can simply iterate over the IAsyncEnumerable:

await foreach (var document in _myIndexRepository.GetDocumentsAsyncEnumerableAsync())
{
.....
}

When the results of the first paginated search are exhausted, our method will automatically use the PIT to request a new page from Elasticsearch.

P.S.: If the client interrupts the enumeration in the foreach loop, the finally block ensures that the PIT is properly closed.

Conclusions

By combining Elasticsearch’s paginated search with C#’s IAsyncEnumerable, we’ve been able to efficiently control document retrieval from the UI without keeping a large number of documents in memory. This approach helps optimize resource usage and enables seamless, incremental document retrieval for a more responsive user interface.

Next step include enhancing the API controller and leveraging NDJSON to stream documents back to the frontend, ensuring an even more responsive UI.

P.S.: This article continues with NDJson and IAsyncEnumerable.

All the code, including integration tests, is available on the GitHub project.

Please, Subscribe and Clap! Grazie!

--

--

Diego Bonura
Diego Bonura

Written by Diego Bonura

Software engineer with a strong sense of humor. Anyway yesterday it worked.

No responses yet