Elasticsearch Bulk Insert Case in Go

Yiğit At
Trendyol Tech
Published in
5 min readMar 25, 2020

In this article, I will try detailing how we managed to perform bulk insert operation for elasticsearch with Go as replatforming our suggestion service. Before development phase, we searched elasticsearch clients for Go. We found 2 clients which were widely used. These were olivere/elastic that is well-known client and elastic/go-elasticsearch that is the official client. After that, we reviewed some benchmark results. Here are shared benchmark results[1–2] that are denoting elastic bulk insert operation between olivere/elastic and elastic/go-elasticsearch as follows:

olivere/elastic:
name ns/op delta
BulkIndexRequestSerialization-4 5710 -0.76%

name ns/op delta
BulkEstimatedSizeInBytesWith100Requests-4 1213027 -8.72%
name new allocs delta
BulkIndexRequestSerialization-4 43 +16.22%
name new allocs delta
BulkEstimatedSizeInBytesWith100Requests-4 12212 +15.08%
name new bytes delta
BulkIndexRequestSerialization-4 1528 -35.91%
name new bytes delta
BulkEstimatedSizeInBytesWith100Requests-4 371193 -41.18%
official-client:
go run default.go -count=100000 -batch=25000

# Bulk: documents [100,000] batch size [25,000]
# ▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
# → Generated 100,000 articles
# → Sending batch [1/4] [2/4] [3/4] [4/4]
# ▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
# Sucessfuly indexed [100,000] documents in 3.423s (29,214 docs/sec)
go run indexer.go -count=100000 -flush=1000000

# BulkIndexer: documents[100,000] workers[8] flush[1.0 MB] in parallel
# ▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
# → Generated 100,000 articles
# ▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
# Sucessfuly indexed [100,000] documents in 1.909s (52,383 docs/sec)

Official client was seemingly efficient. However, olivere/elastic has larger community than official client. It provides simple api that performs elastic search operations such as query, aggregation, insert and so forth. Unfortunately, we found opened issue for olivere/elastic in Github[3] then we considered that official client would be better option.

Development

Now, we had go client and ready to development. Our goal was to insert data in bulk e.g chunk by chunk not as one and trigger scheduler every 1 minute to insert data in buffered queue.

Firstly, 2 metrics consisting of buffer size and percentage were required when to execute bulk insert operation. Buffer size is total count of our queue, percentage is capacity of our collection.

Definition of buffer size and percentage

We had 2 functions that were responsible for triggering scheduler and inserting buffered data in queue.

Definiton of scheduler function
Definition of buffered data bulk insert
Calling functions in service

Secondly, we had 2 challenges during development. Challenges were consisting of culture breakdown and IO (read-write) operations such as remove data from list or insert data into list.

TR and DE are currently used cultures for Trendyol. That means, we need to have 2 different indexes for elasticsearch. We determined to obtain cultures from configuration file and create a map that holds as much as any available culture.

Definition of available cultures in config.yaml
CreateCultureMap function

Perhaps, IO operations were greatest challenge during development because our list was dynamically increasing or decreasing. I would like to mention about how we dynamically stored the data. We used built-in data structure type that was list existing in container/list package[4] . List is implemented as doubly-linked list and has plenty of functions such as remove, insertAfter, pushback, push front and so forth. It would cover our operations however data streaming and read-write operations were occurring in the meantime. Main drawback was concurrency, any element inserting into a list maybe removing in the meantime. For instance, target element ,that was to be processed for removing, had no previous or next node because its root nodes were already removed. Eventually, application happened to be crashing down. Then, a term critical section was what we were dealing with the situation. Definition of critical section[5] is when a program runs concurrently, the parts of code which modify shared resources should not be accessed by multiple go routines at the same time. We implemented lock and unlock mechanism for read-write operation. We defined a struct type called SafeQueue consisting merely 2 properties. CultureMap property was responsible for holding available cultures, Mux was imperative that keep lock and unlock states whenever read-write operation was about to occur.

Definition of safe queue struct

SafeQueue struct was initialized before bulk insert operation

Initialization of SafeQueue struct

As you can see, SafeQueue was filled and ready to be used. Bulk insert operation is instantly executed when our list reaches %80 capacity based on our function. As a result of this, we need to lock and unlock during read-write operations to avoid the critical section issue that I mentioned before.

First lock and unlock states

If our list reaches its capacity so we could go to bulk insert operation otherwise the data is continuously inserted into list. We have a service that is responsible for all elasticsearch operations such that opening a connection and bulk insert. In fact, we made sure that inserting or removing operation was safely performed thru mutex. As one element was being inserted, the other one was being correctly removing from the list. I would like to share the metrics in which we have obtained during bulk insert operation

Indexing rate during bulk insert operation
Instant search rate during bulk insert operation
Instant data sharing amongst shards during bulk insert operation

Conclusion

This replatforming process had great effect on search team such as go routines, go channels, go lang practice, elasticsearch performance, thread management, mutex and so forth. Ultimately, when we tested the our application, 3.3m data were seamlessly indexed in 20 minutes for both TR and DE indexes.

Time interval during bulk insert operation

--

--