Migrating to AWS Managed Streaming Kafka (MSK)

Sandeep Mehta
softrock.io
Published in
13 min readJan 4, 2022

AWS launched Managed Streaming for Kafka (MSK) in 2018.

MSK is s a fully managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes, stream changes to and from databases, and power machine learning and analytics application.

Why AWS MSK:

Apache Kafka clusters are challenging to set up, scale, and manage in production. When you run Apache Kafka on your own, you need to provision servers, configure Apache Kafka manually, replace servers when they fail, orchestrate server patches and upgrades, architect the cluster for high availability, ensure data is durably stored and secured, setup monitoring and alarms, and carefully plan to scale events to support load changes.

That means you spend less time managing infrastructure and more time building applications.

Apache Kafka Clusters (Legacy in our case) are challenging to provision, scale and manage in production. When you manage your own Apache Kafka clusters you need to do at least the following:

  1. Setup and manage instances and servers.
  2. Configure Kafka Brokers and Zookeeper nodes.
  3. Upgrade and apply patches.
  4. Architecting the cluster for High Availability.
  5. Configure Security.
  6. Setup Monitoring and Alerts.
  7. Scale in advance.

All the above tasks are complex, challenging and very time-consuming and you spend most of your time managing the infrastructure. Which is a huge operational cost.

AWS MSK makes everything easy for you. It does all the complex and time-consuming work for you. It automatically provisions and runs your Apache Kafka clusters. It provides you with fully managed servers, which are easy to upgrade. It replaces unhealthy brokers without downtime for your applications. It uses multi-AZ replication for high availability and is free of cost and much more.

Legacy to MSK
Legacy to MSK

Goal: Migrate Apache Kafka Cluster to New AWS MSK.

To accomplish our goal we have to do the following:

  • Create AWS MSK clusters with resources to handle the workload/data from Legacy clusters. (we have used terraform msk module for provisioning clusters).
  • Migrate all the topics and their data.
  • Migrate all the Kafka Clients. (Consumers/Producers)
  • Backup disks and Destroy Legacy Clusters.

To successfully perform the migration you need a migration plan and a migration checklist. Start with basic checklist, test and improve your plan by migrating non production environments.

Migrating Plan:

The main goal is to migrate data, users, topics, topics configurations, topic ACLs and move all the clients (producer/consumer) to the newly created MSK clusters without any downtime and data loss.

MirrorMaker v2 (MM2) detects and replicates topics, topic partitions, topic configurations and topic ACLs to the destination cluster that matches a regex topic pattern. Further, it checks for new topics that match the topic pattern or changes to configurations and ACLs at regular configurable intervals.

We also need to replicate the consumer offsets from the legacy cluster to the MSK cluster, thanks to MM2 version 2.7.0, now MM2 can automatically sync the consumer offsets from one cluster to another cluster. For more info check this KIP. We will be using strimzi.io to deploy mm2. For more info check this.

Migration Setup:

To perform a migration smoothly and successfully the following tools, services and dashboards needs to be deployed.

MSK Cluster Setup

1. Kafka-toolbox:

We use a simple Kafka-CLI docker container to run kafka-clicommands on the cluster. This is mostly used for debugging, testing and sometimes for configuration management (cool kids use CMD). You can also use KafkaCat.

2. Kafka-minion:

Kafka Minion is a feature-rich and flexible Prometheus Exporter to monitor your Apache Kafka cluster. All valuable information that is accessible via the Kafka protocol is supposed to be accessible using KMinion. It provides

  • Consumer Group Lags: Number of messages a consumer group is lagging behind the latest offset
  • Log dir sizes: Metric for log dir sizes either grouped by broker or by topic
  • End to End Monitoring: Sends messages to its own topic and consumes them, measuring a messages real-world “roundtrip” latency. Also provides ack-latency and offset-commit-latency. More Info

3. Strimzi-User-Operator:

Strimzi User Operator manages Kafka users for a Kafka cluster, create authentication credentials for the user. In addition to managing credentials for authentication, the User Operator also manages authorization rules by including a description of the user’s access rights.

Strimzi Topic Operator provides a way of managing topics in a Kafka cluster through Kubernetes resources.

4. Kafka-Proxy:

The Kafka Proxy allows a service to connect to Kafka brokers without having to deal with SASL/PLAIN authentication and SSL certificates. kafka-proxy will authenticate users and proxy all the connections to the brokers.

Every single Kafka clients/services talk to the legacy/msk Kafka clusters using kafka-proxy.In simple words, a service does not need to know anything about a Kafka cluster. It only needs to know the right Kafka-Proxy as a sidecar container. After that, all the traffic and requests from the service for that cluster will go through that sidecar.

Kafka-proxy in action

We are using Kubernetes platform to deploy and manage containerised applications. Every single Kafka service talks to the Legacy Kafka cluster through kafka-proxy-legacy. Also, we have configured another proxy kafka-proxy-msk for Kafka services to talk to MSK Cluster.

--forbidden-api-keys , this flag will forbidden kafka request types. This will be handy when we are going to switch our consumer/producer applications , instead of scaling down the whole service we will only forbidden the produce/fetch api requests. For example if we set this flag to 0, the service will be up and running but wont be able of to produce messages into a kafka topic. http://kafka.apache.org/protocol.html#protocol_api_keys

5. Lenses:

We do have lenses in one of our main development environment. Lenses helped us to perform migration POC and validate and test our migration plan.

6. Kafka-Manager/CMAK:

Kafka Manager or CMAK is a tool for managing Kafka clusters with the help of Web UI. It supports the following and much more :

CMAK supports the following:

  • Manage multiple clusters
  • Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)
  • Create a topic with optional topic configs.
  • Delete topic.
  • Optionally enable JMX polling for broker level and topic level metrics.
  • Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

All the above tools/services helped us in some way to successfully finish the migration of a legacy cluster to an MSK cluster.

7. Kowl:

Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX. We discovered this tool a bit later but it really helped us to validate that data is properly migrated in case of slow topics.

Start Data Replication:

After setting up the destination cluster(MSK) and the tools required for migration, we will start Mirror Maker 2.

Apache Kafka MirrorMaker replicates data across two Kafka clusters, within or across data centers. MirrorMaker takes messages from a source Kafka cluster and writes them to a target Kafka cluster.

We will be deploying MM2 using strimzi.io. as a K8s service. For more information please check out this blog.

MM2 configuration should have the LegacyCluster as the source and the MSKCluster as a target. For more information on mm2 deployment and configuration please check this.

MM2

After MM2 is configured, it will start replicating data, topics. topic ACLS and other configurations.

When a topic is migrated from Legacy to MSK by MM2. It will create a similar new topic in MSK and start replicating the data. It’s not necessary that the topic offsets will be the same in both clusters.

consumer offset sync

MM2 also calculate the consumer offsets for all the consumers from source to target cluster. (the latest version of Kafka (2.7.0) supports automated consumer offset sync across clusters in MM 2.0 ).

Switch Consumers:

Every application is deployed as a Kubernetes service. Every Kafka service talks to Kafka through kafka-proxy . So in a nutshell, if the sidecar kafka-proxy starts talking to the MSK cluster then the consumer service will be pointing to the MSK cluster.

Switching consumers was the easiest task in this whole migration.

We are assuming that all of our Kafka Consumers are idempotent.

Switching consumer Kafka-proxy sidecar

As you can see in the above diagram at some point during the migration we switch the consumer A to MSK cluster just by changing the sidecar to kafka-proxy-msk .

Switch Producers:

Now, this is the tricky one, It clearly depends on how the producer service has been implemented. For example

  1. If it has an outbox pattern.
  2. If it takes the whole service down if it cannot connect to the brokers.
  3. If it catches the exception but keep the service running even if it cannot connect to the brokers.
  4. if it's a fire and forgets. (The producer continues its work regardless the brokers were able to persist the message.)

We had all kinds of producer clients and trust me it took us some time to distinguish them into different categories, so we can use different migration strategies based on the type of producers.

Switching producers without waiting for MirrorMaker to catch up and replicate all the data might cause an ordering issue.

Mirror maker running and replicating data
Switched producer without waiting for mm2 to replicate all the data
finally, mm2 replicates the remaining data and it messed up the ordering

1. Producer with an Outbox pattern:

A service typically needs to update the database and send messages/events. Outbox Pattern is a 2 step method to do this. First, sending the event to an Outbox table and then another service fetches the events from the Outbox table and publishes them to Kafka Topic. If due to any case the service failed to send the event to Kafka, it will keep on retrying until the event is sent.

To switch this kind of Producer was a pretty straightforward task. We change kafka-proxy-legacy to kafka-proxy-msk and set the flag --forbidden-api-keys=0 . Now, this will prevent any produce API calls to the Kafka cluster but the service will keep running and adding events to an Outbox table. Once mirrormake2 finally migrate all the data from the topic in which the producer was publishing the events. We will remove the flag --forbidden-api-keys=0 and the service will now start publishing the events from its left.

2. Producer with no Outbox Pattern:

There are two ways to migrate this kind of producer.

  1. Simply switching the producer to MSKCluster without waiting for MirrorMaker replicate all the data before the producer starts producing again into the MSKCluster. The only problem with this approach is that your topic might end up with a bad ordering of events.
  2. Waiting for the MirrorMaker replicate all the data before the producer starts producing again. In this approach, the First and the most important thing is that the Producer Service/application must not produce any data otherwise. We had this use case and we actually needed some maintenance window.

Monitoring:

Having metrics and dashboards is a must to perform the migration successfully and smoothly.

1. MirrorMaker2:

MirrorMaker 2 (MM2) uses the Kafka Connect framework to replicate topics from one Kafka cluster to another. So we took inspiration from here and used Kafka-Connect JMX metrics to monitor MM2. We also used the Granfna Dashboard example shared by https://strimzi.io/.

In addition to all the above, because we have noticed that mm2 connectors tasks fail silently. So we used kafka-connect-task-metrics and configured Prometheus alerts to be notified if there is a task failure.

2. Kafka Topics/Partitions Metrics.

Enabling metrics for topics and partitions is a must to finish this migration successfully. It's very important that there should not be any data loss during this whole process. The only way to make sure is that by counting the number of messages per topic per partition in legacy and msk cluster and then comparing them.

number of events in a topic in msk should be always equal to or greater than number of events in a topic in legacy

example of an ongoing migration dashboard. Hiding topic names for security reason

One more important thing to check is the number of topics (data topics, not internal topics) should be equal in both legacy and msk.

example of an ongoing migration dashboard, clearly we have some issues to fix 😂

There are many tools available in the market to expose these metrics for you. For example lenses.io, Kafka-Minion.

3. ConsumerGoups Metrics:

As explained above in the blogpost, MM2 translates the consumer offsets from Legacy cluster to MSK cluster into the __consumer_offsets topic i.e. __cosumer_offsets is used to store information about committed offsets for each topic: partition per group of consumers (groupID).

Before and after migrating the consumers/consumer-groups, we need to verify that the consumer metadata is translated and the consumer starts in the MSK cluster from where it left in the Legacy cluster.

After the migration of consumers, the number of consumers Legacy should or must be equal to the number of consumers MSK.

example of ongoing consumer migration, clearly there are a lot of them still need some love 😂

This all can be achieved by exposing consumer metrics. There are again many tools available like lenses.io, Kafka-Minion.

4. Kafka-Proxy Metrics:

This is optional but very useful. As I have explained above that all of consumer/producer services using kafka-proxy to communicate to Kafka clusters. We have used it mostly to verify that the services can talk to the MSK cluster using a proxy. Also checking that how many live consumers or producers have been migrated.

For this, we have used metrics exposed by kafka-proxy sidecars.

Things to keep in mind:

Log Compacted Topics: Kafka removes any old messages/events when there is a newer version of it with the same key in the partition log.

Log compacted topic example

In the first topic with the no compaction all the events or player’s team history is still available i.e. Roadman first joined Pistons and then Spurs and then finally Bulls.

But after enabling the compaction, only the latest players’ team info is available and the history is deleted.

The important thing to notice is that the low watermark (Oldest known committed offset for this partition) is 0 and the high watermark (Highest known committed offset for this partition) is the same in both cases but the number of messages is completely different. Most of the metrics tools use this formula to get topic counts.

topic count = (high watermark) — (low watermark) and add +1 if the low watermark is zero.

I know right? that this will give the wrong count of messages in the topic.

Legacy to MSK

From the above example, the count in legacy will be 5 and in MSK 3.

Slow/Small topics:

In Kafka sometimes the messages/events are past their retention but they are still not deleted. When Kafka deletes the messages because of retention or any other reason it actually deletes the files/segments which contains those messages.

Now event after the retention is expired, Kafka will mark the segment which contains the message for delete but it won't delete the message until the expiry of the log segment. i.e the size of the log segment gets bigger than the log.segment.bytes.

This might lead to a mismatch in the count of messages for a topic in both legacy and msk cluster.

We have not had too many slow/small topics but we still had a few of them. We basically had to manually check that the first message is different and the last message is the same in both the clusters.

MirrorMaker2 Ongoing Issues:

MirrorMaker2 is still an evolving open source project. The great Kafka community is trying to contribute and fix existing issues.

We had this one particular issue where we start a brand new MirrorMaker2 and it will create all the topics from Legacy cluster to Msk cluster but it won't start migrating the data. So the workaround was to delete all the pods of MM2 at the same time and run it again. Probably there might be some other workaround that we don't know yet. If you do please let us know.

Please check this to see if any of the ongoing issues might raise a red flag when you use mirrormaker2.

Tasks failing silently:

As stated before MirrorMaker2 uses the Kafka Connect framework to replicate topics from one Kafka cluster to another. So in a nutshell it's just a bunch of connectors.

A connector instance is a logical job that is responsible for managing the copying of data between Kafka and another system.

Tasks are the main actor in the data model for Connect. Each connector instance coordinates a set of tasks that actually copy the data. By allowing the connector to break a single job into many tasks, Kafka Connect provides built-in support for parallelism and scalable data copying with very little configuration. Source (Confluent)

When a task fails, the status of the connector does not change and it still shows that it's healthy and running. As such, failed tasks are not automatically restarted by the framework and should be restarted via the REST API.

As mentioned above to monitor this we have enabled kafka-connect-task-metrics.

Conclusion:

Migration to MSK has eliminated operational overhead, including the provisioning, configuration, and maintenance of highly available Apache Kafka and Kafka Connect clusters while being cost-effective.

--

--