Kafka from zero to hero: zero-downtime Kafka migration
Half a year ago I started working at Blockport — a cryptocurrency exchange that tries to combine trading with social networking, where experienced traders can share their knowledge with newcomers and be rewarded for that in BPT (Blockport token).
I’ve always wanted to work at a startup. Being an expat in the Netherlands, I have some limitations on the type of companies I can work for, and at the same time, I am quite picky about technologies used for building a product. Blockport, luckily, was the right fit for me — they have successfully launched ICO, so had enough money and could hire expats and their applications were running on Kubernetes on Google cloud, just what I like!
Working at a startup, there are always many tasks that have to be done sooner than later. This often comes with compromises like delivering something fast but not necessarily with the best quality or sound design for long-term usage.
One of such decisions was the deployment of a Kafka cluster. Kafka is used as a message bus for our microservices and gives us delivery guarantees: if some service goes down, messages are preserved in Kafka so another application instance can consume them later when needed. To deliver a Kafka cluster quickly, the first attempt was to run it on a Kubernetes cluster, even though stateful applications are generally considered challenging on Kubernetes. As I wasn’t employed at Blockport at that time, I can’t touch this case in depth, but it wasn’t stable enough. After running Kafka on Kubernetes, we moved it to virtual machines on Google Compute Engine.
Again, trying to avoid the need to battle the sometimes complex Kafka internals, Kafka cluster was deployed using a managed solution from Google Cloud marketplace. Eventually, we did the whole Kafka setup from scratch, and this story is about the reasons for that decision and the path we took.
In the beginning, using a pre-configured Kafka felt good overall, even though my teammate had to patch it a bit, to make sure it was running securely and entirely on our private network at Google.
Being part of 2,5-people DevOps/SRE team (our CTO helps out from time to time with security/automation), we aim to keep our infrastructure available and secure, and Kafka is a vital piece of our environment.
To address high availability concerns, we started doing disaster recovery tests with our staging Kafka cluster. During one of these tests, our developers reported that some Kafka messages were lost. Investigating what happened, we noticed that our cluster had
auto.create.topics.enable=true which allows posting messages to the topic even if it didn't exist before. With that setting, all automatically created topics would get a single replica for each partition - the result of using a pre-configured and non-optimized solution without vetting every setting (we enabled auto-topic creation for quick development, but availability settings with single replica were part of the default setup).
Apart from that, some topics were pre-created from microservices themselves but often had only a single replica configured even though we had five Kafka brokers available. This was a big red flag, as any broker failure could result in lost messages. So, the first and most obvious solution we came up with was increasing the number of replicas across all topics and setting sane cluster availability settings for the future.
Kafka high availability settings
Here are relevant settings we agreed on:
You may wonder why we put the number of replicas to four when official docs say three? Five node Kafka cluster can sustain two node failures. When we created a topic with three replicas and shutdown single server, we still had two replicas available and applications could read and write to a given topic (Kafka doesn’t automatically move unreachable or dead replicas to other available servers). Reads and writes were still possible with one server down due to
min.insync.replicas=2 setting. Nevertheless, after shutting down the second server, while we still had one replica left (so, no data loss) applications couldn't write to the topic anymore as we had only one synchronized replica out of two required. So, setting the number of replicas to four allows us to have the maximum number of failures while preserving topics in a good state for reads and writes.
You may also ask how likely is that both servers go down? It happened to us not so long time ago. One of the best practices in distributed setups is to spread your servers across multiple zones or data centers. Google cloud has three zones, so we have two Kafka brokers in zone A, two in zone B and one in zone C. Zone A had an outage, so our cluster experienced the worst scenario it can currently handle by having two brokers partitioned from the rest of the cluster, but good for us — we were prepared.
One more setting which relates to multiple data centers and can increase the availability of data is rack awareness. We could have applied it, but considering we have four replicas, that feature wouldn’t give us any more benefits. The worst case distribution of four replicas across five servers in three zones is to have two replicas in zone A and two in zone B (one server can’t host multiple replicas of the same topic partition). If the whole zone goes down as it happened, we are still good with two other replicas in another zone. However, if at least one more server goes down — we are doomed anyway, no matter if the last two replicas located in a single zone or two different zones.
Apart from settings replica settings on the server side, we asked all developers to make sure they set
ack=all in Kafka producer settings to make sure enough replicas get updated on writes, and we don't lose any messages if Kafka broker or producer fails during the write. Another good producer setting is to have
retry=3 to avoid delivery failures on intermittent network issues, but most Kafka client libraries have such setup by default.
Fixing the replication factor
In Kafka, you can increase or decrease the number of replicas for a given topic following this approach. While that works, it’s not so fun doing it for more than a hundred topics. So, I had to write a script which does that for all topics on a given cluster. It’s not the best bash scripting for sure, but it does the job very well and has a check mode which can be useful in finding under replicated topics. There is native Kafka command
kafka-topics.sh --zookeeper zookeeper:2181 --describe --under-replicated-partitions
but it shows partitions which have problems with replication, while this script compares the current replication factor versus the desired one.
If you are lazy to read the code (or my code is very bad), the script generates JSON files needed for increasing or decreasing replication factor with a randomized list of brokers to make sure the load is spread evenly across the cluster, so we don’t make single broker a leader for all partitions. Distributing partition leadership is essential because all writes and reads are going to the partition leader, not to replicas, so making a single machine a leader for all partitions would overload that machine. After JSON file generation, the script executes partition reassignment and waits for the successful completion as this process can take time depending on the number of messages in the partition and performance of your cluster.
Official Kafka docs also describe the problem which I was solving with this script:
An ideal partition distribution would ensure even data load and partition sizes across all brokers. The partition reassignment tool does not have the capability to automatically study the data distribution in a Kafka cluster and move partitions around to attain an even load distribution. As such, the admin has to figure out which topics or partitions should be moved around.
Having this script, we started increasing the replication factor on all topics. While testing on a new cluster and staging environment went smooth, our pre-live cluster had issues which took many hours to debug. While I brought the pre-live cluster back to a healthy state, I never figured out the root cause of cluster issues, and it was a red flag that procedure on a live cluster might not go so well. Perhaps some Kafka guru would be able to find root cause easily, but most of my fellow engineers I talked to had similar experiences debugging Kafka clusters. In the meantime, our devs were also reporting some Kafka bugs not related to replicas. As we were running Kafka version 1.1, we thought why not upgrade the cluster before trying to fight problems which may be fixed in the newer version, perhaps it would also fix our replication issues (Kafka 2.1.0 was already available at that time).
Kafka setup from scratch
So having an idea to update the cluster to a newer version, we decided to look a bit more into our pre-configured setup. While that solution provides some reasonable configuration defaults, it turned out to be painful to introduce some changes to the cluster or upgrade it. For instance, every server has specific systemd service, which is an overall service to control any underlying applications that our solution provider offers on the cloud marketplace. It means that you won’t easily find Kafka service when you log in to the machine. However, after some digging, we found out Kafka process was managed by gonit, so a few times I ended up in the situation when that specific service I mentioned above was running fine, but Kafka itself was down which is in my view a drawback of having several layers of service management. Another potential problem for us was that
google deployment manager was used to install Kafka. One of the worst things about deployment manager is that it doesn't understand if some VM gets deleted - it doesn't reconcile state as Terraform does. So, any manipulations you may decide to do on expanding or shrinking the cluster can result in painful modifications of jinja or YAML files which you have to copy from the deployment manager page. During our short time of using the deployment manager, we also experienced issues with updates to Google APIs. Older API version (our initial deployment from cloud marketplace) would require network and subnetwork field for static IP resource:
While the newer one, would not create the resource before you comment one field or the other. So, it was a lot of fun combining two versions of jinja templates when we had issues with the pre-live cluster. All in all, we decided that creating Kafka cluster from scratch with Terraform and Ansible would give us everything we needed for future cluster updates: proper integration with Google APIs including good state management, and full control over Kafka processes and configs on virtual machines.
Ignoring any issues with initial setup, it was still useful as a reference when we were writing our automation scripts. For instance, we also set up Kafka data disks separately from boot disks as official Kafka docs recommended it (as we figured out later). While Terraform scripts are pretty standard, I thought it might be useful to share Kafka startup script which mounts attached data disk and sets up some disk performance settings recommended by Kafka admin guide:
From other potentially useful things to share about our setup, we added node exporter, kafka exporter and jmx exporter to get important Kafka metrics into Prometheus as well as google stackdriver agent where we separate different Kafka logs for easier debugging:
Prometheus metrics allow us to have these helpful alerts when Kafka goes down or some app is lagging behind on Kafka topic:
So, having our new and shiny scripts ready, we were hoping to startup new clusters, change DNS to new machines and probably restart Kubernetes pods if they wouldn’t handle address changes properly by themselves and be done with the migration. That idea seemed ok as before starting with automation for new Kafka clusters, we discussed new cluster setup with developers and because no important data was stored inside Kafka, we were aiming for lift and shift approach. But, as it often happens, the initial discussion was high-level, we didn’t think through every use case and turned out that we couldn’t just migrate to new clusters without preserving data. The data under question wasn’t important from the business perspective meaning we wouldn’t have lost any trades or whatsoever, but it would break user experience for a day or two as we needed some historical currency prices for nice badges about price changes in the UI. It’s also very likely that we didn’t have that feature when we started with automation. Bottom line — circumstances changed again, so we needed to come up with a way to preserve data.
A note on DNS — Kafka brokers have to be reachable from clients and producers directly due to reads and writes going to partition leaders. So, while we have a single DNS address for the whole cluster, it is used only as a discovery mechanism by clients. The client issues metadata request to figure out where to read data from (or write if it is a producer) and then talks directly to that server for data processing. As metadata request can hit any server, you don’t need to maintain a list of brokers across your applications — applications discover brokers via DNS. And in case metadata request hits a dead server, that’s where
retry=3 setting becomes useful. Meanwhile, a successful metadata request returns a healthy broker address to which your application can send data, so cluster DNS address is not used anymore.
If you check the standard procedure of adding a new node to Kafka cluster, you have to start new Kafka server and point it to the Zookeeper cluster (zookeeper is another component for Kafka clusters which keeps state about partition leaders and available brokers among few others). Recalling initial setup which had good defaults for security, that would mean that we needed to set up new Kafka servers with the same authentication credentials as older ones. In practice that translates to some manual steps of copying Kafka credentials and adding them to Ansible scripts which set up new servers. Otherwise, we wouldn’t be able to join our old cluster.
On the zookeeper node, check
zoo_jaas.conf and copy value for
Then, use that value for new Kafka servers in a file
The second problem to think about was that we planned to restart apps pointing them to a new cluster (version 2.1.0) and that’s it. However, with the data preservation requirement, we had to join nodes with new Kafka version to the cluster with old Kafka version. Luckily, Kafka allows doing that by specifying
inter.broker.protocol.version as described here. Again, not a big deal, but each extra step adds complexity to the whole process.
Another critical point is you need to make sure broker IDs of new servers don’t overlap with the old ones, so we disabled automatic broker ID generation (like it was done in the managed cluster) and assigned static IDs from 0 to 4.
Finally, as we have already written automation for Zookeeper setup, we couldn’t use it anymore because we would rely on old Zookeeper cluster. Migrating Zookeeper clusters is whole another story, and we decided to skip that for the time being — Zookeeper was running the latest version anyway. So, that automation was just some wasted time from the one side, but from the other — without doing that automation, we wouldn’t have figured out Kafka and Zookeeper authentication details which were necessary as I described above.
Analyzing all potential problems with integration, we decided on the following plan for getting data on new Kafka clusters:
- we add new Kafka servers (version 2.1.0) to the old cluster by using
inter.broker.protocol.version=1.1setting and existing zookeeper authentication credentials
- add new servers to DNS so all Kafka clients can talk to new servers even though they have no data yet
- start copying data to new servers by executing replication script (described above) in non-intrusive way: all topics are available for read/write operations during copying because we always specify two old servers in the broker list to preserve minimum synchronized replicas and avoid too much movement of data (even though data movement can be controlled by throttling feature as well)
- stop old servers, remove them from DNS and leave cluster for a few days for testing, so we are sure everything is good
- restart Kafka servers with
inter.broker.protocol.version=2.1one at a time and keep an eye on logs. If two servers go well, we proceed with rest; if not, we can still rollback as the majority of servers is on the old protocol. Protocol change is an irreversible setting, so going back would mean deleting some machines and reprovisioning them from scratch with Terraform and Ansible.
Having everything ready, we started testing the procedure on staging and pre-live clusters. Having five new nodes with IDs 0–4 and five old ones with IDs 1001–1005, we performed the migration of data in the following way:
- migrate all topics to servers [0, 1005, 1004, 1003, 1002] > confirm initial set up is good with one new server
- migrate all topics to servers [0, 1, 2, 1005, 1004] > continue process making sure
min.insync.replicasnumber of replicas are still on old machines. In case we overlooked something at this point, old servers are still functional for reads/writes
- migrate all topics to servers [0, 1, 2, 3, 4] > no more data on old servers
Each migration step took around 30 minutes to complete as we have around 150 topics and not much data. During the script execution, I used zookeeper CLI by running
zkCli.sh on the zookeeper server to monitor the migration process. A critical thing to check during the data replication is the status of replication itself by running
get /admin/reassign_partitions, which reports which partitions are still in the process of migration for a given topic. While on staging and pre-live the whole process went without a single hiccup, during the live migration, one of the topics was taking too much time to replicate. We started checking our monitoring dashboards, and we could see that input/output spikes which happen during data movement finished twenty minutes ago while the CPU of one of the servers was still high. Looking into logs, we found the following message repeating:
Replica(s) 0 indicated which server had problems with replication and it was the same server as the one where CPU was high. So, we restarted that Kafka server and process continued without any issues after that. If something goes wrong, it's also good to check which server has a controller role by running
get /controller from zookeeper CLI or checking logs. There are some circumstances when it can hang, so deleting it from zookeeper via
delete /controller makes Kafka elect a new leader, and that may resume the replication process.
Looking back, I would be very happy if we analyzed everything from scratch and maybe didn’t go for fast solutions at all, but that’s an ideal world type of situation. In reality, you are always constrained by time or resources. In our case, we had to go through several problem-solution cycles before getting a good grip on our Kafka clusters. You may experience the same if you don’t have in-house Kafka experts. This approach worked for us, and I hope it will give you some good ideas about Kafka management and help overcome migration struggles if you ever have to do it. I am pretty sure that on bigger clusters, you would have to consider more things like data throttling or rack awareness which I mentioned before and probably some others. Let me know if you are interested in any other parts of our set up, and hopefully, I’ll be able to share them with you.