Migration from Self-managed Kafka Cluster to Amazon MSK

Yingying Tang
Brex Tech Blog
Published in
11 min readSep 14, 2021

TL;DR

Our backend microservices at Brex use synchronous and asynchronous means for communicating with each other. The synchronous communications are fulfilled by gRPC calls, and the asynchronous communications are supported by an Event Infrastructure we designed and implemented. The Event Infrastructure, which is backed by an Apache Kafka cluster, provides the library for services to easily publish and consume events to/from Kafka, and supports retries and deadlettering on top.

In our initial implementation we used a self-maintained Apache Kafka cluster on Kubernetes. However, with the growth of Brex, it was taking increasing engineering effort to provision, configure, scale, and orchestrate the Kafka brokers and ZooKeeper. To reduce infrastructure management and focus more on our growing business, we decided to migrate from our self-managed Kafka cluster to Amazon Managed Streaming for Apache Kafka (Amazon MSK). Amazon MSK manages the Kafka clusters and ZooKeeper nodes for us, and was a drop-In replacement for our existing services.

This article illustrates our migration process from self-managed Kafka Cluster to Amazon MSK, with no downtime publishing and no message loss introduced for all backend services across multiple languages.

Background

In order to decouple services, we’ve implemented an Event Infrastructure, in which one Kafka cluster is shared by all backend services to produce and consume event streams. We have all backend services send the event messages (defined in Protobuf) to a centralized Event service through gRPC requests, and the Event service publishes the event to the Kafka cluster. We have ~70 consumers written in 3 different languages:

  • The majority of our backend code has been written in Elixir, as explained by our co-founder Pedro in this post. We are using KafkaEx as the Elixir client for Kafka.
  • All new backend services are now written in Kotlin, as illustrated in this post. We use Micronaut Kafka to build the Kotlin consumers.
  • Our Infrastructure services are written in Golang. Kafka-go is used as our Go client libraries for Kafka.

Currently in the Production Kafka cluster, we have ~940 topics, each topic has 24 partitions and total data stored within a 7 day retention is around 136GB. We process ~2 GB of event data daily, with 30–100 messages/second on average, and a spike of ~300 messages/second published and consumed. We have built monitors and SLOs in Datadog, such as event publishing success/failure rate, consumer lags per topic partition, and end to end duration that measures the time taken from publishing an event to consuming the event.

Migration Options

For the migration to be successful, we need to meet these requirements:

  • Zero downtime for Publishing.
  • No event data loss.
  • Minimize duplicate messages consumed.

We explored several options, as described below.

  1. MirrorMaker (MM) 1.0, which is part of Apache Kafka. MM 1.0 is capable of data replication from a source Kafka cluster to a destination Kafka cluster. However, MM 1.0 does not support consumer migration between clusters, and cannot map message offsets across clusters. This results in consumers beginning from the first offset in the log, instead of resuming from their last committed offset. Although our consumers are typically designed to be idempotent, we wanted to avoid unnecessarily replaying all the messages.
  2. MirrorMaker 2.0, which leverages the Kafka Connect framework and ecosystem. MM 2.0 overcomes the disadvantages of MM 1.0, and is capable of data replication as well as consumer group offset sync between clusters. MM 2.0 inherits Kafka Connect’s metrics, and provides its own metrics, which can be used to monitor the migration process.
  3. Implement our own replication tool. One option would be to dual write the event messages to both the self-managed Kafka cluster and the MSK cluster with versioning, and consume messages with specific versions from the two clusters. We would need to consume the remaining events in the self-managed cluster that are not published to the MSK cluster, and also avoid consuming duplicate messages. This option was discarded due to the complicated steps involved, and it also wouldn’t allow us to retain historical data from the source cluster.

In the end, we chose to use MM 2.0 for the migration process. Before making the decision and starting the migration process, we performed benchmark testing in our Staging environment, where we found and resolved several issues that came up. One of the main issues was the Elixir consumer group offsets, which we will talk about in the next section.

Elixir consumer group offset migration from Zookeeper to Kafka

At Brex, we have Kubernetes clusters in Production, Staging, and Development. To verify if MM 2.0 works for data replication as well as consumer group offset sync, we set up an AWS MSK cluster for testing purposes. We deployed a Kafka Connect Docker image to a Kubernetes pod and ran MM 2.0 to migrate some topics from the Kafka cluster in Staging to the MSK testing cluster. The data replication process went well, however to our surprise, there were no offsets for the Elixir consumer groups in the MSK cluster! What could be wrong here?

Let us look into how MM 2.0 works first. MM 2.0 is composed of several connectors:

  • MirrorSourceConnector replicates topics from the source cluster into the target cluster.
  • MirrorCheckpointConnector emits internal topic checkpoints.internal containing offsets for each consumer group in the source cluster, stores cluster-to-cluster offset mappings in internal topic offset-syncs.internal. In addition, as introduced in KIP-545, if automated consumer offset sync is enabled, it will translate and synchronize the __consumer_offsets topic from the source clutter to the target cluster.
  • MirrorHeartbeatConnector emits heartbeats to confirm connectivity through the connectors.

We had been using the latest version of Kafka that contains the KIP-545 fix, but MirrorCheckpointConnector did not successfully synchronize the Elixir consumer offset information between the clusters. In addition, when we tested topic migrations for other Kotlin or Golang consumer groups, the __consumer_offset information was created in the target cluster correctly. This suggested that the issue must be in the Elixir Kafka client (KafkaEx).

It turned out that we had been using an older api version of the Elixir Kafka client KafkaEx, which stored the consumer group offset information in Zookeeper instead of Kafka, therefore MirrorCheckpointConnector was not able to retrieve the offsets from the Kafka brokers in the source cluster. A newer version of KafkaEx called Kayrock with a higher api version supports storing offsets in Kafka. However, the consumer offset information would have gotten lost if we had upgraded to a new version without migrating the existing offsets from Zookeeper to Kafka. In order to smoothly migrate to the new version, we needed to write a migration script to fetch the offsets from Zookeeper for each consumer group and topic using the old api version and commit them to Kafka using the newer api version.

In the Event Infrastructure, each Elixir consumer group is implemented as an individual application, and is managed by Kubernetes Deployments. When each consumer application starts, it starts up a pool of KafkaEx.GenConsumer workers to consume the assigned topic partitions. For a smooth transition, the migration script needed to be executed before starting each consumer application. The flowchart below shows how we implemented the offset migration script.

  1. We started a single consumer group worker at the beginning of each migration;
  2. Since the single worker was not supervised by the KafkaEx ConsumerGroup, we managed this consumer worker ourselves, strictly following the Kafka Rebalance Protocol:
  • Join_Group. The consumer sent out the JoinGroup request to the broker coordinator, and received the consumer group information back, including the consumer id, the group’s leader id, group’s current generation id, etc.
  • Sync_Group. The consumer sent the SyncGroup request after receiving the JoinGroup response. The consumer received the partition assignment back.
  • Heartbeat. The consumer periodically sent a Heartbeat request, containing the group generation id and consumer id, to the broker coordinator to keep its session alive.
  • Leave_Group. Once we finished fetching and setting the offsets for all subscribed topics, the consumer sent out a LeaveGroup request, which initiated a rebalance.

3. With the worker joining the consumer group, we looped through each subscribed topic by this consumer group, fetched the topic partition offsets stored in Zookeeper using the old KafkaEx api version, and used the value to set the offsets in Kafka with the newer api version;

4. At the end of the migration script, we ran a verification script to compare the offset values stored in Zookeeper and Kafka, in case we skipped updating or incorrectly set the offsets.

During the time of this migration, we had ~55 Elixir consumer groups. We categorized the consumer groups by teams, and gradually migrated the consumer group offsets for each team, first in Staging environment, then in Production.

After the offset migrations, we started to see the consumer group information get created and updated in the target MSK cluster when we run MM 2.0!

Kafka MirrorMaker 2.0 configuration, deployment, and testing

We now have MM 2.0 working for our preliminary test cases, but how does it work for all existing topics across all consumer groups? We continued benchmark testing in Staging using MM 2.0, and improved our MM 2.0 setups during this process.

MM 2.0 properties

Following the MM 2.0 documentation, we defined properties for MM 2.0 connectors. We only enabled one replication flow from the upstream source Kafka cluster to downstream target MSK cluster. Since we did not want to rename the topics so that the consumer groups could subscribe to the same topics, we specified an empty replication.policy.separator and cluster alias (by default MM modifies the topic names in the destination cluster to include the source cluster name as a prefix).

MM 2.0 monitoring

Kafka Connect reports a variety of metrics through Java Management Extensions (JMX). Since MM 2.0 is built on the Connect framework, it inherits all of Connect’s metrics. In addition, MM 2.0 produces its own metrics under the kafka.connect.mirror metric group. To use MM metrics to track the mirroring progress, we exposed the metrics to a Prometheus endpoint using JMX Exporter, and used Datadog Agent to collect the Prometheus metrics. The JMX exporter is configured and executed in a separate Kubernetes container, which sits in the same pod as the Kafka Connect container. We defined the metrics rules and patterns in a ConfigMap that can be read by the JMX exporter.

MM 2.0 performance tuning

With the properties configured and JMX metrics set up for MM, we conducted benchmark testing in the Staging environment and identified some non-optimal configurations:

  • MM timed out while waiting for the producer to flush messages. We tuned offset.flush.timeout.ms together with producer.batch.size, as analyzed and recommended by this article.
  • We changed compression.type from the default of none to snappy, to reduce message size. This article explained in detail about MM Performance Tuning.
  • We scaled up the Kafka Connect Kubernetes Deployment and increased the number of tasks for the MirrorSourceConnector, to speed up the replication process.

The testing went smoothly for all topics and consumer groups with the updated configurations.

Strimzi deployment for MM 2.0

Now that we have MM 2.0 deployed and tested, should we go ahead with the migration in Production? There were still some issues with our current deployment on Kubernetes:

  • We had to deploy and manage the Kubernetes Deployment of Kafka Connect.
  • We had to set up the JMX exporter sidecar container.

We needed a way to specify these configurations in an easy to manage and declarative way. For this we looked into Strimzi, which provides container images and Operators for running Kafka on Kubernetes. Kafka components including Kafka Connect and Kafka MM 2.0, are defined as Custom Resources, and deployed and managed by the Strimzi Operator. Strimzi also supports monitoring JMX metrics through Prometheus JMX Exporter. We could simply create a KafkaMirrorMaker2 custom resource, with the MM properties and JMX metrics previously defined, and Strimzi would start MM 2.0 to synchronize data between Kafka clusters as per our configuration.

As a result, Strimzi worked well for our benchmark testing in Staging, and made it easy to configure, deploy, and manage MM 2.0. In addition, We have upcoming projects using other Kafka connectors (e.g. Debezium), and Strimzi would also help simplify deployment and management of the new connectors. We decided to use Strimzi together with the KafkaMirrorMaker2 custom resource for our migration to the MSK cluster.

Migration to MSK in Staging and Production

The diagram below illustrates the overall steps taken for our migration process from the self-managed Kafka cluster to the MSK cluster. We did the migration in Staging, confirmed the steps were working properly, and then performed the migration in Production.

The order of operations was important to ensure we didn’t have any downtime or loss of data:

1.We deployed Strimzi Operator as well as Custom Resource Definitions (CRDs) for Kafka Connect and MM, then created a KafkaMirrorMaker2 Custom Resource which started MM 2.0 to mirror all the existing data in all our topic partitions to the new cluster.

We used MM metrics to monitor the replication progress. Once we saw that MM’s replication rate had settled to be around the same as our event publication rate, we knew that MM had caught up, and was now just replicating new messages being produced in the source cluster.

sum:connect_mirror_mirrorsourceconnector_record_count{*} by {partition,topic}
sum:connect_mirror_mirrorsourceconnector_record_rate{*}.as_rate()

2.Once MM had caught up and was replicating new messages being published, we pointed the consumers to the MSK cluster. Since the consumer offsets were synced between the source and target clusters, once we pointed the consumers to MSK, consumers started to consume from the offsets they previously committed.

3.After configuring all consumers to consume from the MSK cluster, we then configured the Event service to produce events directly to the MSK cluster and stop producing to the source cluster. In this way, no new data would be published to the source cluster.

4.Once we saw MM’s replication rate drop to 0, all existing data had been replicated from the source cluster to the target cluster, then stopped MM 2.0.

Conclusion

Now we have successfully finished this migration, and our Event Infrastructure is backed by the AWS MSK cluster. We’ve set up Datadog metrics and alerts to monitor MSK cluster performance. We’re also working on improving the MSK cluster stability and control by enabling Automatic Scaling and utilizing Cruise Control.

What’s next? We’re on our way to building a Distributed Data Mesh platform to support use cases across Brex from Metrics, to Analytics, ML Models and Data Warehousing. Our next step in this journey is to expand our existing Event Infrastructure to ingest data from multiple different sources, starting with Change Data Capture (CDC) across our production databases. If this work sounds interesting to you, and you want to help build financial software and services for every growing business, come join us at Brex!

Special thanks to Adnan Abdulhussein for co-authoring this blog post, to the Cloud Infrastructure team (in particular Richard Gooch and Evan Dudla, for MSK cluster setups), as well as to all Brex Product teams involved in the migrations.

--

--