Why Should You Use the Bulk API to Import Big Data to Elasticsearch?

Gökhan Gürgeç
cloudnesil
Published in
5 min readJun 9, 2024

In the LLM’s era, writing technical story may be illogical but as Flaubert says, “You must write for yourself, above all.” This story is for remembering the lessons I learned from the work.

In today’s data-driven world, efficient data storage and retrieval are critical for any application’s success. Elasticsearch, a powerful search and analytics engine, is widely used for its scalability and speed. However, when it comes to importing large datasets into Elasticsearch, many developers encounter performance bottlenecks and memory issues. The solution? Elasticsearch’s Bulk API. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

In this article, we’ll explore the benefits of using the Bulk API for importing large datasets into Elasticsearch and walk through a practical example of how to implement it. We’ll also compare it with the traditional method of using the Index API. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html

The Challenge of Importing Large Data

Imagine you have a large dataset containing millions of records, such as a database of addresses. Directly indexing each document individually can be extremely inefficient and slow due to the overhead of HTTP requests for each document. This approach also risks overloading your Elasticsearch cluster and running into memory issues.

We have millions of address data and I want to import these addresses to Elastic search.

Traditional Method: Index API

Using the Index API, you would typically index each document with a separate request. Here’s a simplified example:

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.List;

public class ElasticsearchIndexApiImporter {
public static void indexData(RestHighLevelClient client, List<String> jsonDataList) throws IOException {
for (String jsonData : jsonDataList) {
IndexRequest request = new IndexRequest("address")
.source(jsonData, XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
}
}
}

Drawbacks of the Index API

  1. High Network Overhead: Each document requires a separate HTTP request, leading to significant network overhead.
  2. Slow Performance: The time taken to index documents increases linearly with the number of documents, making it unsuitable for large datasets.
  3. Resource Intensive: Frequent network calls and processing on Elasticsearch nodes can lead to resource exhaustion, affecting cluster performance.

Enter the Bulk API

The Bulk API in Elasticsearch allows you to perform multiple indexing or delete operations in a single API call, significantly reducing the overhead and improving performance. By batching multiple documents into a single request, you can optimize the use of network resources and reduce the load on your Elasticsearch cluster.

Benefits of Using the Bulk API

  1. Improved Performance: By batching documents, the Bulk API reduces the number of HTTP requests, lowering the network overhead and improving indexing speed.
  2. Resource Efficiency: Bulk operations use fewer resources on the Elasticsearch cluster, allowing it to handle more operations simultaneously.
  3. Reduced Latency: Fewer round trips between the client and server lead to reduced latency, enhancing overall performance.

Practical Example: Address Import with Bulk API

Let’s dive into a practical example. Suppose we have a JSON file containing a large list of addresses that we need to import into an Elasticsearch index. We’ll use Java to implement this, leveraging the Bulk API for efficient data import.

Input: JSON array of addresses

[
{
"raw_address" : "Huzurevleri Mh. Türkmenbaşı Blv. No: 46 Kat: 5 Daire: 5 Çukurova Adana",
"latitude" : 37.03906480046042,
"longitude" : 35.27391716837883
},
{
"raw_address" : "Mahfesığmaz, 79077. Sk. 9 B Adana",
"latitude" : 37.039067,
"longitude" : 35.30697
}
]

Step-by-Step Implementation

1. Setup Elasticsearch Client

First, we create a client to connect to our Elasticsearch instance.

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
public class ElasticsearchClient {
public static RestHighLevelClient createClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
return new RestHighLevelClient(builder);
}
}

2. Create the Index

Next, we create the index with the required mappings and settings.

import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;

public class ElasticsearchIndex {
private static final String INDEX_NAME = "address";
public static void createIndex(RestHighLevelClient client) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME);
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);
String mapping = "{\n" +
" \"properties\": {\n" +
" \"latitude\": { \"type\": \"double\" },\n" +
" \"longitude\": { \"type\": \"double\" },\n" +
" \"raw_address\": { \"type\": \"text\" }\n" +
" }\n" +
"}";
request.mapping(mapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println("Index created: " + createIndexResponse.index());
}
}

3. Process and Import Data Using Bulk API

We then process the JSON file and use the Bulk API to import the data.

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ElasticsearchImporter {
private static final String INDEX_NAME = "address";
private static final int BULK_SIZE = 1000;
public static void main(String[] args) throws IOException, InterruptedException {
RestHighLevelClient client = ElasticsearchClient.createClient();
// Step 1: Create the index with the provided configuration
ElasticsearchIndex.createIndex(client);
// Step 2: Read JSON files and bulk index data into Elasticsearch
File folder = new File("path/to/json/files");
File[] listOfFiles = folder.listFiles((dir, name) -> name.toLowerCase().endsWith(".json"));
// Use a virtual thread executor
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
ObjectMapper objectMapper = new ObjectMapper();
if (listOfFiles != null) {
for (File file : listOfFiles) {
executorService.submit(() -> {
try {
processJsonFile(client, file, objectMapper);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);
client.close();
}
private static void processJsonFile(RestHighLevelClient client, File file, ObjectMapper objectMapper) throws IOException {
JsonFactory factory = new JsonFactory();
try (JsonParser parser = factory.createParser(file)) {
List<IndexRequest> requests = new ArrayList<>();
if (parser.nextToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() != JsonToken.END_ARRAY) {
JsonNode jsonNode = objectMapper.readTree(parser);
IndexRequest request = new IndexRequest(INDEX_NAME)
.source(jsonNode.toString(), XContentType.JSON);
requests.add(request);
if (requests.size() >= BULK_SIZE) {
bulkIndexData(client, requests);
requests.clear();
}
}
// Index remaining requests
if (!requests.isEmpty()) {
bulkIndexData(client, requests);
}
}
}
}
private static void bulkIndexData(RestHighLevelClient client, List<IndexRequest> requests) {
try {
BulkRequest bulkRequest = new BulkRequest();
for (IndexRequest request : requests) {
bulkRequest.add(request);
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
System.err.println("Bulk indexing had failures: " + bulkResponse.buildFailureMessage());
} else {
System.out.println("Bulk indexing succeeded.");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

Key Points

  1. Memory Efficiency: By processing the JSON file in chunks and using the Bulk API, we avoid loading all records into memory at once, preventing out-of-memory errors.
  2. Parallel Processing: Using virtual threads allows us to process multiple files concurrently, further improving performance.
  3. Scalability: The Bulk API enables Elasticsearch to handle a higher throughput of indexing operations, making your application more scalable.

Comparison: Index API vs. Bulk API

Performance Test

To illustrate the performance difference, consider importing a dataset of 1 million address records:

  • Index API:
  • Each document is indexed individually.
  • High network overhead and slower performance.
  • Increased load on Elasticsearch cluster.
  • Bulk API:
  • Documents are batched into requests of 1,000 documents each.
  • Reduced network overhead and faster performance.
  • Lower resource consumption on Elasticsearch cluster.

Test Results

  • Index API: Took approximately 2 hours to index around 1 million records.
  • Bulk API: Took approximately 15 minutes to index around 1 million records.

The Bulk API provided a 8x performance improvement, demonstrating its efficiency in handling large data imports.

Conclusion

Importing large datasets into Elasticsearch can be challenging, but the Bulk API offers a robust solution. By batching documents into single requests, you can significantly improve indexing performance and resource efficiency. The practical example provided demonstrates how to implement this approach in Java, making it easier to manage large data imports and maintain the performance of your Elasticsearch cluster.

Switching from the traditional Index API to the Bulk API can result in substantial performance gains and resource savings. For any large-scale data import tasks, leveraging the Bulk API is the way to go.

Happy coding, and may your data always be efficiently indexed!

Note: This story is written by Chat-GPT with several corrections on prompts and some edits by me. :) Thanks to OpenAI for the great Chat-GPT and to humanity for accumulating so much information, providing valuable input for large language models.

--

--

Gökhan Gürgeç
cloudnesil

IT professional worked on various positions(test engineer, developer, project manager) of software development, passionate to good quality software development