Running Kafka in Kubernetes, Part 2: How we migrated our Kafka clusters to Kubernetes.

Johnson Okorie
Wise Engineering
Published in
10 min readSep 23, 2021

In Part 1, we talked about Wise’s motivations for moving our Kafka clusters to Kubernetes. We also outlined the challenges we faced running such a setup. In Part 2, we will talk about our approach to these challenges and showcase how we successfully migrated our Kafka clusters to Kubernetes, with no downtime experienced by our users.

Our Approach

Following Part 1, in this article, we detail our approach to handling the challenges we identified in migrating our Kafka cluster to Kubernetes:

Zero Downtime Migration

We mitigated against downtime during the migration with a ‘stretch cluster’ approach. This means we added new Kafka brokers, hosted in Kubernetes, to the existing Kafka cluster hosted in AWS EC2 instances. For this to be possible, we needed to ensure proper communication between AWS EC2 instances (for both Kafka and Zookeeper) and brokers residing in Kubernetes.

We also needed to ensure clients could connect properly to the new brokers. We gradually migrated Kafka topic partitions from AWS EC2 brokers to brokers in Kubernetes. Using custom (in-house built) Cruise Control goals, we were able to control which partitions we wanted to migrate to brokers in Kubernetes. We monitored performance of clients accessing these partitions to ensure expected performance.

Client accessing brokers in k8s and EC2

Operations

To deal with the operational challenges identified in Part 1, we did the following:

Challenge: Kafka brokers are stateful and need to be able to maintain identity. The identity and data they store are not interchangeable with other brokers.

We deployed brokers to Kubernetes as Statefulsets. This ensured every broker had a unique identity and ensures that the persistent volumes (data) of each broker remains attached to the same pod (and are never interchanged between pods).

The unique identity from Statefulsets was also important to setting up configurations that needed to be different per broker (e.g different advertised listeners for each broker as will be seen in the section on networking).

We chose to manage Kafka with plain Statefulsets over using any of the existing open source operators. Some issues we faced using operators included:

  1. We wanted to get more confidence running Kafka in Kubernetes. Operators are inherently complex. Running Kafka with plain manifests made it easier to understand what was going on and Kafka’s requirements. This will help us make better decisions when we do adopt an operator.
  2. Most operators did not operate well when they had brokers, not managed by the operator itself, as part of the Kafka cluster.
  3. Some operators required that an accompanying Zookeeper cluster is always created with the Kafka brokers it created. This would make it impossible to use our existing Zookeeper cluster.
  4. Some operators made it difficult to update Kafka configurations to suit our requirements. For example, some operators made it difficult to freely update advertised listeners.
  5. It was difficult to integrate with our security infrastructure as one could not define sidecars to sit alongside our Kafka brokers.

Challenge: Kubernetes liveness and readiness probes have to be configured based on Kafka health metrics. This ensures broker pods are not killed unnecessarily during operation and that rollouts are paced correctly to avoid shrinking the number of in-sync replicas of partitions, possibly to a level that might cause unavailability.

We built a custom Kafka image with scripts baked in for readiness probes and broker configurations. Readiness probe scripts in the broker were executed by Kubernetes readiness probes and the resulting exit code of the script used to decide the outcome of the probe.

The readiness probe scripts accessed JMX metrics on the brokers (e.g broker state) to decide on the health of the current broker as well as querying the Kafka cluster to understand the state of the cluster with data such as the number of underreplicated, offline, under-min-isr and at-min-isr partitions. Based on the feedback received, it decides if the cluster is in a stable state and returns an appropriate exit code.

We also built mechanisms into our docker image to allow overriding readiness probes. This is useful when a Statefulset might get into a broken state, or a specific pod has persistent problems that need a new image rolled out to fix it (This should always be done with care though).

Kafka cluster partitions state during rollout

We configured Kafka’s Statefulset spec.updateStrategy.type to RollingUpdate (More on RollingUpdate strategy here). This will ensure that, during a rollout, all broker pods must be in a ready state before the rollout can proceed. Since pod readiness depends on the Kafka cluster state being stable, rollouts proceed in such a way that partitions never have their number of in-sync replicas fall below an acceptable value (i.e below the min.insync.replicas count).

Challenge: In a multi-cluster setup, each Kubernetes cluster operates independently. In our case, we run two Kubernetes clusters. The Kafka cluster needs to be designed in such a way that it can sustain operations that happen simultaneously on both Kubernetes clusters, e.g broker pods being rescheduled at the same time on both Kubernetes clusters.

In our multi-cluster Kubernetes setup, we assume that it is possible that we can have a loss of one Kubernetes cluster or the loss of one availability zone (and not both at the same time!). Assuming the Kafka cluster had a min.insync.replicas configuration set to 3, we would need to ensure that at any point in time, all partitions have at least 3 in-sync replicas.

In either Kubernetes cluster, it is possible that broker pods can be rescheduled independent of each other. This means that, even with a disruption budget of 1, ensuring that only one broker pod is unavailable per kubernetes cluster, we would have 2 replicas unavailable for any partition. This will bring the in-sync replica count of affected partitions down to one and make the partition unavailable to application services.

To avoid this scenario, we increased our default replication factor to 6. This number comes as a result of the number of logical racks/failure zones we have in our setup i.e 3 availability zones and 2 Kubernetes clusters (2 * 3 = 6). Setting the replication factor to 6 means that we have 3 replicas in each of our 2 Kubernetes clusters. In the event that one pod gets rescheduled in each Kubernetes cluster, a max of two replicas will become unavailable and 4 replicas will still be in-sync. This is above the min.insync.replicas setting and hence, should not disrupt application services.

Challenge: The Kafka cluster needs to be resilient against both availability zone and Kubernetes cluster failures.

Using pod topology spreads we configure all broker pods to be distributed evenly across availability zones. With a replication factor of 6, we would have 2 replicas in each availability zone and 3 replicas in each Kubernetes cluster. We increased the min.insync.replicas setting from 2 to 3 to ensure that a produced message is still written to at least 2 availability zones. (If we left it at 2, with 2 replicas in each availability zone, it is possible that a message is committed when it was written to a single availability zone).

A failure of an availability zone will lead to the loss of 2 replicas at most, leaving 4 in-sync replicas. This means the cluster should be able to continue to operate with the loss of an availability zone and as an added bonus, still have room to lose one broker in such a disaster scenario.

A failure of a Kubernetes cluster will lead to the loss of 3 replicas at most, leaving 3 in-sync replicas. The cluster will still be operational in such a scenario, however, all partitions will be at-min-isr at that point and will not be able to tolerate another broker failure. We don’t expect this to be a scenario as common as availability zone failures.

This decision does have its drawbacks. Increasing the replication factor to 6 lead to an increase in end-to-end latencies of up to 7 milliseconds and doubled storages costs. The increased scalability and reliability benefits was worth the cost in our use case.

Resilient Replica Distribution

We used Cruise Control’s self-healing capabilities to ensure even distribution of replicas between availability zones and Kubernetes clusters. We had to develop a custom Cruise Control goal to ensure even distribution. Our custom goal unlike Kafka’s rack aware assignment, could consider more than one dimension for replica distribution. In our use case, these two dimensions are availability zones and Kubernetes clusters. For a partition with a replication factor of 6, the goal ensured that we always had 2 replicas in each availability zone and 3 replicas in each Kubernetes cluster.

Cruise Control was configured to detect anomalies based on the requirements of our custom goal. By doing this, Cruise Control monitored that all partitions satisfied the expected distribution at regular intervals. If a new topic was created that did not conform to a valid distribution, Cruise Control self-healing would be triggered and the replicas of the partition would be reassigned such as to produce an acceptable distribution.

Replica distribution across kubernetes clusters and availability zones

Resource Allocation

To provide enough resources for the operation of our Kafka clusters, and to avoid resource contention with other application services within our Kubernetes clusters, we decided each broker in the cluster should have its own separate machine/VM instance and utilise most of the resources on that machine. These instances should not be available to application services.

To achieve this, we decided to provide separate worker pools. No other workload asides Kafka brokers were to be scheduled on this pool of workers. We enforced this using node affinity rules by applying taints to the workers in the pool and applying the appropriate tolerations to broker pods. This ensured application services were never scheduled in the separate worker pool, and that Kafka brokers were always scheduled to workers in the pool. The number of workers in the pool are scaled automatically to accommodate the number of brokers in the Kafka cluster. Pod anti-affinity rules were also used to ensure no more than one broker was scheduled to a worker.

One disadvantage of this approach is that resources are consumed by Kubernetes for its operations on each worker, which reduces the total amount of resources available to Kafka. In future, we might consider putting more brokers into larger instances to reduce this overhead.

Kafka worker pool among other workers in our Kubernetes clusters

Networking

Our application services inter-communicate through a service mesh based on envoy. This service mesh allows inter-communication between services even across our two Kubernetes clusters. An in-house developed control plane aids in service discovery and allows communication between our 500+ services running in Kubernetes. All services run alongside an envoy sidecar that subscribes to service mesh updates broadcast by our control plane. These updates are used to build a local representation of the service mesh that services use to communicate among themselves. We decided that communication with Kafka should be no different.

We extended our control plane such that it watches for changes (e.g broker added/removed) to the Kafka cluster through Zookeeper and builds mesh updates that are sent to all envoy sidecars. Kafka broker advertised listeners had to be updated to make application services use their local envoy sidecars to communicate with Kafka.

Control plane mesh updates sent to sidecars contain the current IP address of broker pods allowing service requests to be proxied to these brokers. Each Kafka cluster is mapped to a unique local loopback address, and each broker is mapped to a unique TCP port on the envoy sidecars. The service sends requests to Kafka through envoy on these address/port pairs.

For example:

  • Kafka cluster A consists of 3 brokers, we select a loopback address of 127.0.0.5 for this cluster and for the 3 brokers we assign ports 9091–9093. A client could hence reach Kafka cluster A through 127.0.0.5:9091.
  • Kafka cluster B consists of 6 brokers, we select a loopback address of 127.0.0.6 for this cluster and for the 6 brokers we assign ports 9091–9096. A client could hence reach Kafka cluster B through 127.0.0.6:9096.

The envoy sidecars also upgrade plain-text connections to mTLS and hence provide a secure connection to our Kafka clusters.

With this setup, when a broker pod is rescheduled, and comes back up with a new IP, all services are updated accordingly and can communicate with the rescheduled broker. Adding and removing brokers also automatically updates the service mesh. This approach, most importantly, also allows for communication between clients and brokers across our Kubernetes clusters.

Client accessing brokers using control plane updates

With the expected deprecation of Kafka’s dependency on Zookeeper, we have it in our plans to make our control plane compatible with future versions of Kafka. The control plane will use metadata from future versions of Kafka’s new self-managed quorum.

Where we are now….. and where we are going?

Photo by Tara Winstead from Pexels

We have been running Kafka on Kubernetes for a few months now with no major incidents and have seen the following improvements:

  1. Abstraction of cloud providers, allowing us to build a cloud agnostic platform and tooling.
  2. Time saved by removing AWS EC2 instance management toil.
  3. Faster development and Kafka broker rollouts.
  4. Easier configuration management. Managing docker images is much easier than Ansible Playbooks.

The Realtime Data Platform team has a lot more planned to reduce toil, increase automation and improve the scalability and reliability of our platform.

We hope to leverage Kubernetes operators to our advantage to automate:

  1. Replica rebalancing when a broker needs to be added or removed from the cluster,
  2. Remedial actions in disaster scenarios,
  3. Horizontal cluster scaling based on cluster performance (e.g when the cluster is under-provisioned).

P.S. Interested to join us? We’re hiring. Check out our open Engineering roles.

--

--