Migrating to a Multi-Cluster Managed Kafka with 0 Downtime
In 2021, my team worked on migrating Wix’s 2000 microservices from self-hosted Kafka clusters to a multi-cluster Managed cloud platform (Confluent Cloud) in a seamless way that did not need to involve the owners of these services. The migration was done during regular traffic without any downtime.
This was the most challenging project I have ever led, so in this post I want to share with you key design decisions we took, best practices and tips for this kind of migration.
Split Overloaded Clusters
During the last couple of years, usage of Kafka by business OLTP services at Wix has increased dramatically, as more and more services incorporated event driven architecture.
Our self-hosted cluster scale increased from 5K topics with >45K partitions in 2019 to 20K topics with >200K partitions.
Traffic also increased from ~450M records produced per day to >2.5B produced per day per cluster.
This puts a big strain on the cluster controller startup time, as it has to load all this metadata on partitions, which makes the leader election duration greatly increase. It also has a big effect on startup time of individual brokers if they happen to go down causing more and more incidents of under-replicated partitions.
In order to avoid this instability in our Kafka clusters in production, we decided to migrate our self hosted Kafka clusters to Confluent Cloud and split the single-cluster-per-data-center to multiple clusters.
Why Managed Kafka
Self-managing a Kafka cluster is no easy feat, especially when performing tasks such as rebalancing partitions among brokers, or upgrading broker versions. These can be quite scary and require careful planning and execution. That is especially true when the clusters become overloaded with metadata as was our case.
Below are 4 benefits of using a managed Kafka cloud platform and Confluent Cloud in particular:
- Better Cluster performance & flexibility
Brokers Partition rebalance is done for you, so brokers don’t become performance bottlenecks. You can also easily scale the cluster capacity up or down in order to achieve cost effectiveness.
- Transparent version upgrade
The Kafka code base is being improved all of the time, especially focused on KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum. Which, when completed will dramatically increase the amount of metadata each cluster and broker can hold. By going managed, you get automatic version upgrades which improve performance and fix bugs.
- Easy to add a new Cluster
In case you require a new cluster. Setting it up is dead simple.
- Tiered Storage
Confluent Platform offers Tiered Storage, which allows to dramatically increase the retention period for Kafka records without paying for expensive disk space, by shifting older records to much cheaper S3 storage, without adding latency to latest-offset record consumption.
Switching 2000 Microservice to Multi-Cluster Kafka architecture
At Wix we have a standard JVM library and Proxy service for interacting with Kafka called Greyhound. It’s an open source high level SDK for Apache Kafka which offers additional features such as concurrent message processing, batch processing, retries, and more. You can read more about it in this article or you can watch this talk.
Greyhound is the event-driven backbone of Wix ~2200 microservices, so introducing the multi-cluster concept only had to be done in a few places (including the library and the proxy service). Only new topics’ producers and consumers have to explicitly specify the cluster, as old ones are automatically mapped.
All that is needed by developers is to define the logical name of the cluster when constructing the greyhound consumer or producer, as can be seen below:
How to Split
We decided to split the Kafka clusters according to various SLAs. For example, CI/CD pipelines and data migration use cases have different SLA than production services. Important to note that not all data centers have the same amount of Kafka clusters, as each data center serves a different purpose.
Greyhound (Wix’s own Kafka SDK) knows how to address this issue and avoid crashing in case a cluster is not available in the data center where the service instance is currently running.
Migrate Traffic Drained Data Center?
In order to make it easier to migrate producers and consumers of 2000 microservices to multiple managed Kafka clusters, the initial design relied on first completely draining each data center (DC) from traffic.
This design meant that all that was required was to switch the connection details of producers and consumers to their new Kafka clusters. As Wix microservices used Greyhound layer to connect to Kafka clusters, changing connection was required in only one place — Greyhound production configuration (while making sure only 1 DC is affected).
As simple as this design was, it was unfortunately not possible to implement and execute.
There were several reasons:
- There are services that are only deployed in one of the data centers and it is difficult to move them.
- This design means all-or-nothing and is very risky for the moment traffic returns. In case of edge cases that were not switched, they may suffer data loss.
- Traffic cannot be completely drained from a data center for a long period of time, because that could greatly increase the risk of downtime to some service.
Instead, a new design was planned involving migration during live traffic.
Migrate With Traffic
Performing migration during live traffic meant that meticulous planning and execution had to take place.
The process had to be gradual (only affecting some microservices each time in order to reduce blast radius in case there is an issue) and completely automated, in order to reduce chance of manual errors, including automation of a rollback process.
Migrating producers first (before consumers) was not an option, as it meant a considerable amount of time was needed to make sure all consumers have finished processing all records found on the self-hosted cluster and can safely switch to the new cluster topic. This would have caused considerable delays in processing which could harm certain OLTP business flows and users. Also, rolling back consumers due to some unexpected issue would have been impossible without data loss.
Active Kafka consumers had to be switched first, while making sure no message is lost and there is minimal re-processing of records. The only way to do this would be to first replicate the records of all the consumed topics from self host cluster to destination managed cluster.
In order to make sure no message processing is lost during migration, we created a dedicated replication service. Once all consumer topics were identified, the replicator service was ordered to create the topics in the appropriate cloud clusters, and start consuming records from the self hosted cluster and produce them to the target cluster.
In order to facilitate consumer migration, the Replicator also persisted the offset mapping for each partition such that the Greyhound consumer can start processing records in the cloud cluster from the correct offset — which was mapped from the first non committed offset in the self-hosted cluster.
In addition to the Replicator, there was a Migration Orchestrator that made sure a topic was replicated, all offsets were mapped, and then proceeded to request the Greyhound consumer to unsubscribe from the self-hosted cluster. After verifying this was successful, the Orchestrator requested the consumer to subscribe to the cloud cluster, while first seeking the correct mapped offset.
In case of failure, the Orchestrator was able to request the consumer to revert back to the self-hosted cluster.
All this Orchestrator to consumer communication was itself done via dedicated Kafka migration topics. Greyhound consumers started listening to them when they were started.
Once all consumers of a certain topic were migrated, it was possible to migrate its producer.
The initial migration design entailed requesting the producer to switch cluster connection while still accepting incoming produce requests. This meant buffering the requests in-memory and was deemed quite risky.
We later came up with a simpler design which relied on Wix’s gradual Kubernetes deployment process. Each new pod starts to accept incoming requests only once all its health checks are OK, including connection to Kafka. As this process is gradual, there are always “older” pods that are running such that the service as a whole is always available for incoming requests.
On pod startup, the Greyhound producers simply checked a database to ascertain to which cluster they should connect. Much simpler then dynamic cluster switching and records buffering. This meant the migration was safe, no requests could be lost and the service was kept at high availability.
Only once a producer was migrated, was it acceptable to stop a topic’s replication. But in order to migrate a producer, first all its topic consumers had to be migrated.
It turned out that many topics had multiple consumers from different services, which meant the Replicator consumer had more and more traffic to process and replicate.
This caused an issue when a limit was reached to the amount of topics the consumer could handle due to a technical limitation with our relatively old version of self-hosted Kafka brokers. After several attempts to increase message.max.bytes backfired (see KAFKA-9254 bug) and caused a serious issue, we decided to simply add more Replicator consumers and shard the topics-to-replicate among them.
Beyond Migration — External Consumer Control
This “with traffic” migration design opened many new possibilities to dynamically change Greyhound consumers configuration or state without needing to GA a new version to production.
As now we have the infrastructure in place for Greyhound consumers to listen to incoming commands to change state or configuration. Such commands can include:
- Switch cluster — unsubscribe from current cluster and subscribe to another one
- Skip records — skip records that can’t be processed
- Change processing rate — dynamically increase or decrease amount of parallel processing or add delay for throttling and back-pressure
- Redistribute records — In case there is a growing lag on a single partition, be able to redistribute records among all partitions (and skip the old ones).
Currently these are all theoretical, but can be much more easily implemented using the migration infrastructure already in place.
Best Practices and Tips
Below is a list of best practices and tips for a successful Kafka clusters migration:
- Create a script that checks state by itself and stops if expected state is not reached
Making the migration process as automatic as possible is key, so having the script check by itself if it can move to the next phase greatly expedite the process. Un the other hand auto-rollback and self-healing is very difficult to get right, so best to leave that for manual intervention.
- Have a rollback readily available
No matter how well you tested your migration code, a production environment is the realm of uncertainty. Having a readily available rollback option for each phase is very important. Make sure to prepare it in advance and to test it as much as possible before you start running the migration.
- Start with test/relay topics and no impact topics
Migration can be very dangerous, as records can potentially be lost or recovery can be painful. Make sure to start testing your migration code with test topics. In order to have a realistic examination. Use test topics that actually mimic production topics by replicating real production records to the test topics in a special test app. This way, failure in the consumers migration process won’t have any production affect, but it will provide you with a more realistic production simulation.
- Create custom metrics dashboards that show current and evolving state
Even if you create a fully unattended automatic migration process, you still need to be able to monitor what is going on and have tools to investigate in case there is an issue. Make sure to prepare dedicated monitoring dashboards in advance that clearly show the current and historical state of consumers and producers you are migrating.
In the graph below we can see how a producer successfully switched from self hosted cluster to managed cluster (throughput goes down as more and more pods are restarted and read the new configuration)
- Make sure to be on latest patch version of your self-hosted Kafka broker
As our self-hosted Kafka brokers were not on latest patch version, we ended up with a production incident when trying to increase the value of message.max.bytes several times (see more in Replication bottleneck section of this post). My recommendation is to first increase your self-hosted cluster Kafka brokers versions. If not to latest, then at least to latest patch version.
We used Greyhound & dedicated orchestration services and scripts in order to achieve an automatic, safe, and gradual migration in a seamless manner during live traffic.
This was not an easy task. In case you can completely drain traffic from your data-centers and/or afford processing down time, switching producers and consumers to their new clusters without replicating the data first is highly recommended. It’s a much simpler design that will take you much less time to get done.
Otherwise, when migration under traffic, you need to keep careful attention to the order in which you perform the migration (Consumers before/after Producers) and make sure you understand the consequences of this decision (ability to rollback, chance of losing data).
Below you will find a handy flowchart to make it easy for you to understand the various options.
Thank you for reading!
If you’d like to get updates on my future software engineering blog posts, follow me on Twitter and Medium.
You can also visit my website, where you will find my previous blog posts, talks I gave in conferences and open-source projects I’m involved with.
If anything is unclear or you want to point out something, please comment down below.