How to migrate from Elasticsearch 1.7 to 6.8 with zero downtime

Dor Sever
BigPanda Engineering
9 min readDec 25, 2019

My last task in BigPanda Engineering was to upgrade an existing service from using Elasticsearch version 1.7 to a newer Elastic version 6.8.1.

In this post, I will share how did we migrate from elastic 1.6 to elastic 6.8 with harsh constraints like zero downtime, no data loss, zero bugs and provide you with a script that does the migration for you.

This post contains 6 chapters(one is optional):

  • What’s in it for me— What were the new features that led us to upgrade our version?
  • The Constraints — What were our business requirements
  • How can we Address the constraints
  • The Plan
  • [Optional chapter] — How did we handle the infamous mapping explosion problem?
  • How to do data migration between clusters

Chapter 1 — What’s in it for me?

What benefits are we expecting to solve by upgrading our data store?

There were a couple of reasons:

  1. Performance and stability issues — We were experiencing a huge amount of outages with long MTTR that caused us a lot of headaches. This reflected in frequent high latencies, high CPU usage and more
  2. Non-existing support in old Elasticsearch versions — We were missing some operative knowledge in Elasticsearch, and when we searched for outside consulting we were encouraged to migrate forward to receive support
  3. Dynamic mappings in our schema — Our current schema in Elasticsearch 1.7 used a feature called dynamic mappings that made our cluster explode multiple times and we wanted to address this issue
  4. Poor visibility on our existing cluster — We wanted a better view under the hood and saw that later versions had great metrics exporting tools

Chapter 2 — The Constraints

  • ZERO downtime migration — We have active users on our system, and we cannot afford the system to be down while we are migrating
  • Recovery plan — We cannot afford to “lose” or or “corrupt” data, no matter what is the cost. So we must prepare a recovery plan in case our migration fails
  • Zero bugs — We must not change existing search functionality for end-users

Chapter 3 — Thinking of the plan

Let’s tackle the constraints from the simples to the hardest:

Zero bugs

In order to address this requirement, I studied all the possible requests the service receives and what were its outputs and added unit-tests where needed.

In addition, I added multiple metrics (to the Elasticsearch Indexer and the new Elasticsearch Indexer ) to track latency, throughput, and performance, which allowed me to validate that we only improved them.

Recovery plan

  • This means, that I need to address the following situation: I deploy the new code to production and stuff is not working as expected. What can I do about it then? Since I was working in a service that used event-sourcing, I could add another listener (Diagram attached below) and start writing to a new Elasticsearch cluster without affecting production status

Zero downtime migration

The current service is in live mode, and cannot be “deactivated” for periods longer than 5–10 minutes. The trick in getting this right is this:

  • Store a log of all the actions your service is handling (We use Kafka in production)
  • Start the migration process offline (and keeping track of the offset before you started the migration).
  • When the migration ends, start the new service against the log with the recorded offset and catch up the lag
  • When the lag finishes, change your frontend side to query against the new service and you are done

Chapter 4 — The Plan

Our current service uses the following architecture(based on message passing in Kafka):

  1. Event topic contains events produced by other applications (for example, UserId 3 created)
  2. Command topic contains the translation of these events into specific commands used by this application (for example: Add userId 3)
  3. Elasticsearch 1.7 — The datastore of the command Topic read by the Elasticsearch Indexer.

We planned to add another consumer(new Elasticsearch Indexer) to the command topic, which will read the same exact messages and write them in parallel to Elasticsearch 6.8

Where should I Start

To be honest, I considered myself a newbie Elasticsearch user, and to feel confident to perform this task, I had to think about what’s the best way to approach this topic and learn it. A few things that helped were:

  1. Documentation — It’s an insanely useful resource to everything Elasticsearch. Take the time to read it and take notes (don’t miss: Mapping and QueryDsl).
  2. HTTP API — everything under CAT API. This was super useful to debug things locally and see how Elastic response. (don’t miss: cluster health, cat indices, search, delete index)
  3. Metrics (❤️) — From the first day, we configured a new and shining dashboard with lots of cool metrics (taken from elasticsearch-exporter-for-Prometheus) that helped and pushed us to understand more about Elastic

The code

Our codebase was using a library called elastic4s and was using the oldest release available in the library — A really good reason to migrate on!
So the first thing was, to just migrate the versions, and see what breaks

There are a few tactics on how to do this code migration, and the tactic we choose was to try and restore existing functionality first in the new Elastic version without re-writing the all code from the start. AKA, reach existing functionality but on a newer version of Elasticsearch

Luckily for us, the code already contained almost full testing coverage so our task was much much simpler, and that took around 2 weeks of development time

It’s important to notice, that if that wasn’t the case, we would have had to invest some time in filling that coverage up and only then migrating to a new version (since one of our constraints was: do not break existing functionality)

Chapter 5 — The mapping explosion problem

Let’s describe our use-case in more detail, this is our model

class InsertMessageCommand(tags: Map[String,String])

And for example, an instance of this message would be:

new InsertMessageCommand(Map("name"->"dor","lastName"->"sever"))

And given this model, we needed to support the following query requirements:

  1. query by value
  2. query by tag name and value

The way this was modeled in our Elasticsearch 1.7 schema was using a dynamic template schema (since the tag keys are dynamic, and cannot be modeled in advanced)

The dynamic template caused us multiple outages due to Mapping explosion problem, and the schema looked like this:

Nested documents solution

Our first instinct in solving the Mapping explosion problem was to use nested documents.

We read the nested data type tutorial in Elastic docs and defined the following schema and queries

And this solution worked. However, when we tried to insert real customer data we saw that the number of documents in our index increased by around X 500

We thoughts about the following problems and went on to find a better solution:

  1. Amount of documents we had in our cluster was around 500 million documents, and this meant that in the new schema we were going to reach two hundred fifty billion documents (that’s 250,000,000,000 documents 😱)
  2. We read this really good blog post — https://blog.gojekengineering.com/elasticsearch-the-trouble-with-nested-documents-e97b33b46194 which highlights that nested documents can cause high latency in queries, and heap usage problems
  3. Testing — Since we were converting 1 document in the old cluster, to an unknown number of documents in the new cluster, it will be much harder to track if the migration process works without any data loss (if our conversion was 1:1, we could assert that the count in the old cluster equals the count in the new cluster)

Avoiding nested documents

The real trick in this was to focus on what were the supported queries we were running: search by tag value, and search by tag key and value

The first query does not require nested documents since it works on a single field, and for the latter, we did the following trick.
We created a field that contains the combination of the key and the value, and whenever a user queries on key, value match, we translate his request to the corresponding text and query against that field

Example:

Chapter 6 — The migration process

We planned to migrate about 500 million documents with zero downtime. And to do that we needed:

  1. A strategy on how to transfer data from the old Elastic to the new Elasticsearch
  2. A strategy on how to close the lag between the start of the migration, and the end of it

And our two options in closing the lag:

  1. Our messaging system is Kafka based, and we could just take the current offset before the migration start, and after the migration ended, start consuming from that specific offset. This solution requires some manual tweaking of offsets and some other stuff but will work
  2. Another approach to solving this issue was to start consuming messages from the beginning of the topic in Kafka and make our actions on Elasticsearch idempotent — meaning, if the change was “applied” already, nothing will change in Elastic store

The requests made by our service against Elastic were already idempotent so we choose option 2 because it requires zero manual work (no need to take specific offsets, and then set them afterward in a new consumer group)

How can we migrate the data?

These were the options we thought of:

  1. If our Kafka contained all messages from the beginning of time we could just play from the start and the end state would be equal — But since we apply retention to out topics, this was not an option
  2. Dump messages to disk — and then ingest them to Elastic directly — this solution looked kind of weird, why store them in disk instead of just writing them directly to Elastic?
  3. Transfer messages between old elastic to new Elastic — This meant, writing some sort of “script” (anyone said python? 😃) that will connect to the old Elasticsearch cluster, query for items, transform them to the new schema, and index them in the cluster

We choose the last option and these were the design choices we had in mind

  1. Let’s not try to think about error handling unless we need to. Let’s try to write something super simple, and if errors occur, let’s try to address them. In the end, we did not need to address this issue since no errors occurred during the migration
  2. It’s a one-off operation, so whatever works first / KISS
  3. Metrics — Since migration processes can take long hours to days, we wanted the ability from day 1 to be able to monitor error count and to track the current progress and copy rate of the script

We thought long and hard and choose python as our weapon of choice. The final version of the code is attached below

dictor==0.1.2 - to copy and transform our Elasticsearch documents
elasticsearch==1.9.0 - to connect to "old" Elasticsearch
elasticsearch6==6.4.2 - to connect to the "new" Elasticsearch
statsd==3.3.0 - to report metrics

Conclusion

Migrating Data in a live production system is a complicated task that requires a lot of attention and careful planning. I recommend taking the time and working through the steps listed above and figuring out what works best for your needs

As a rule of thumb, always try to reduce your requirements as much as possible. For example, Is zero downtime migration required? Can you afford data-loss?

Upgrading data stores is usually a marathon and not a sprint, so take a deep breath and try to enjoy the ride

  • The whole process listed above took me around 4 months of work.
  • All of the Elasticsearch examples that appear in this post have been tested against version 6.8.1.

--

--