Consolidating Kafka, or How To Stay Sane and Save Money

Ken Ho
Hootsuite Engineering
18 min readOct 20, 2022
Photo by Lance Grandahl on Unsplash

This post has been a joint effort from the Backend Platform team at Hootsuite.

Authors:

  • Michael Wagler
  • Jason Moore
  • Ken Ho
  • Nur Ozgun

1. Why Consolidate

In 2022, the Backend Platform team at Hootsuite finished off a year-long project of successfully consolidating our 4 existing Kafka clusters into 1 single new cluster. This involved migrating ~125 consumers and ~60 producers consuming from and producing to ~250 topics across 4 different event streams, to a single event stream on a brand new cluster. We managed to achieve this with zero downtime or data loss, and minimal lag to our consuming services.

The reasons for performing the consolidation require an explanation of why we had a 4 cluster setup in the first place.

Back when we introduced Kafka at Hootsuite, our infrastructure was divided between 2 separate logical VPCs, one of which was using EC2-Classic, and the other Amazon VPC. In order for the events that were produced in one VPC to be consumable in the other VPC, we had to use Apache MirrorMaker to copy them over using an AWS ClassicLink. The setup was as follows:

Events were first produced to the Local cluster of the producer’s VPC, and then mirrored over to the Aggregate cluster in the other VPC. This way, as long as consumers consumed from both the Local and Aggregate clusters within their own VPC, they could receive all events in Hootsuite for a given topic.

We created consumer libraries which abstracted over the multi cluster setup so that from a consumer app’s point of view, once you had the correct Kafka connection strings configured in your app, you could treat the 4 cluster setup as a single logical cluster. One of these libraries was for Scala, and another was a language agnostic Kafka REST interface named Kafka Bridge which operated as a sidecar.

In 2017 however, we migrated the EC2-Classic cluster to an Amazon VPC cluster, which allowed us to enable peering between the VPCs. With peering, the entire 4 cluster setup was made redundant, since now any service in either VPC could connect to any of the 4 Kafka clusters.

Still, consolidating the Kafka clusters down to 1 was not a priority for the next few years. However, over time the downsides of having the extra clusters became increasingly obvious.

First, there were the financial costs of running 9 extra Kafka nodes and 5 extra ZooKeeper nodes. Between staging and production, this amounted to ~$44,000 annually. Given that we had this setup unnecessarily for about 4 years, this would have amounted to an extra ~$176,000 paid to AWS.

More important were the technical costs of maintaining this extra complexity. This quadrupled the amount of hours spent patching the OS, updating Kafka versions, and troubleshooting problems. Sometimes our MirrorMaker process would fail, causing an increase in lag on one of the clusters. Sometimes a developer unfamiliar with our setup would accidentally forget to connect to both local and aggregate clusters, and wonder why they were not receiving events.

We had to be careful to ensure that the same topics existed on all 4 clusters. To do this we had a build pipeline that teams would need to use whenever they created or altered topics. These jobs would connect to all 4 clusters and perform the same action on each of them. Partial failures here would lead to more confusion on the part of the developer, inevitably requiring input from our team to help address the problem.

Also, our 4 cluster setup had to be reflected in all our libraries and tooling. One of the reasons we created our own consumer library code for Scala was specifically to hide the details of connecting to multiple clusters at once.

The problem that ultimately forced our hand in the consolidation was that we wanted to move to Amazon’s managed Kafka service, Amazon Managed Streaming for Apache Kafka (MSK). Trying to migrate our convoluted 4 cluster setup to a managed platform would have been a waste of time and money, and so ultimately provided the impetus to do the consolidation.

2. Analysis and Planning

The first thing that we needed to do was understand how different teams were using our Kafka deployment and what their integration requirements were. These were captured by the following criteria: producer only, consumer only, high latency tolerant and out of order delivery tolerant. For example, our analytics team did not always require ordered delivery of events, but it was very important that all events were delivered at least once. We also had teams that batch processed events once a day so their consumer groups operated with very high latencies.

To gather this information we asked teams to self report and document their requirements against the given criteria. We were able to pre-populate the data for consumer groups and topic information since this could be queried from the clusters themselves. Producer integrations posed more of a challenge however, since we were not tracking producers directly. We were also restricted by the fact that we were running an older Kafka message format, since we had services running older Kafka client implementations. This meant that we did not have access to features like Kafka Message headers and could not easily include producer metadata. We scanned our repositories for Kafka producer configurations in an effort to track them all down, but ultimately we had to rely on teams to report their producers accurately.

Once we had a sense of the services, producers, consumers and topics in use, we wanted to build the dependency graph between services that produced certain event types and services that consumed them. The hope was that we would be able to identify sub graphs of services that could be moved as self contained migrations.

The first challenge we had was that the tabular format we had used to collect all this data from other teams was difficult to work with and visualize, since in many cases there was a many to many relationship between the services and topics. To help with this, we wrote a transformer that rendered our table entries in Graphviz syntax. We then compiled all the entries to create a single SVG graph that we could analyze easily.

This showed us that unfortunately we had very complex interdependencies between our topics and the services that produced and consumed them. We did manage to identify approximately six subgraphs that could be migrated independently, however this was a very small minority of the total migration.

Since reasoning about the migration in terms of the data dependencies themselves was complex, we placed more emphasis on the initial integration requirements we had asked teams to provide about their services (such as whether they could tolerate out-of-order events). We had ruled out the use of a replication based approach to execute the migration (see section 3, Potential Designs and Processes), and that had led us towards a producer first approach using a single brand new cluster. The integration requirements for each service now suggested an ordering.

Consumers that did not have ordering requirements could be moved first, since they could consume equivalent streams between the clusters concurrently. Producers could be moved next. The group that required the most coordination was services that required low latency, in order delivery of events; these consumers had to ensure they had fully consumed all data from the original clusters, then immediately switch to the new cluster. Finally, consumers that were tolerant of high latency could be moved last.

The small number of independent subgraphs we had identified could be moved first using this process and that would allow us to test the approach while limiting the impact if something went wrong.

Now that we had a high level plan that we could socialize with different stakeholders, our biggest concern was formulating a rollback or disaster recovery process. Our analysis so far suggested a true rollback would not be possible after the producer cut over, so we would have to either accept data loss or make fixes inline and move forward. We wanted to ensure our plan was sane and that we were not failing to leverage any tools that could make the process or recovery easier. We met with the MSK group at AWS as well as internal teams at Hootsuite like our Technical Architecture Group and others that had previous experience of Kafka migrations. No red flags were raised, so we moved forward with detailed planning and preparation.

3. Potential Designs and Processes

We brainstormed several different approaches for consolidating the clusters. The criteria for evaluating each process were:

  • Ease of implementation
  • How to ensure data integrity during the process; ie no missing data as well as no out of order consumption of data
  • How much lag consumers would experience during the process
  • Could we do any sort of rollback?

We considered a number of different approaches before arriving at our final design. The main alternatives we looked at were “Consumer-First, Renaming Duplicate Topics, In Place”, and “Producer First, In Place”.

While these approaches could have been workable, they had a lot of downsides. Here is a summary of the main problems we encountered.

Downsides

  • These 2 approaches were highly complicated. They required a large number of pull requests whenever we added or removed a topic from a cluster, and they both included multiple versions of each topic, with different prefixes depending on the cluster, which would be extremely difficult to keep track of.
  • Neither of these approaches allowed for any real kind of rollback, since they each would reach a point of no return. This turned out to be true for our final design as well, but since it was much simpler to execute and required far fewer steps, the effort required to roll-forward made it much more viable.
  • In general, the other consumer-first approaches (using mirrormaker) we encountered resulted in interleaved events. This is because if you mirror events to a new cluster, the moment you move the producers to produce directly to the new cluster instead of the original, there will be interleaving between the mirrored events from the original cluster and the newly produced events to the new cluster.
  • The alternative to interleaving for the consumer-first approach is generally data loss: some of the other approaches we encountered suggested that in order to avoid interleaving when the producers move to the new cluster, you first turn off the producers briefly until the mirroring lag from the old cluster to the new cluster drops to 0. Since we could not tolerate any data loss, this was also not an option for us.

Final Design and Justification: Producer First, New Cluster

Given the sheer complexity of the alternative approaches, we decided on a different approach.
Our final approach was a Producer-first approach, but much simpler than the one described above.

The reason that the previous approaches were so complicated was because we were trying to do everything in place, using only the existing clusters. To consolidate onto one of the existing clusters forced us to be mindful of the fact that we had 4 different versions of each topic (one per cluster), and each of those topics had consumers with entirely different offsets. As such, you couldn’t simply make the consumers for one version of that topic start consuming from another version of that topic, since they would either risk re-consuming a number of events, or else missing a number of events.

In order to avoid all of this complexity, we decided to start with a clean slate: create a new cluster.

With a new cluster, the approach to take became extremely simple.

  1. Make all of the producers produce to the new cluster.
  2. Wait for all the consumers to finish consuming on all of the old clusters.
  3. Move all the consumers to consume from the new cluster.

The appeal of this approach was that it had the potential to meet all of our requirements.

  • There would be no risk of duplicate events, since we were not bothering with mirroring.
  • There would not need to be any interleaved events, since events were only ever produced directly by the producers, not also by mirrormaker.
  • There would be no data loss, since the producers would have no downtime

The only requirement which would take more thought was how to minimize consumer lag. On the surface, this approach would require us to wait until the slowest of our consumers was fully caught up on the old cluster before migrating to the new cluster. Some of our consumers consistently had a lag of many hours (these were generally related to batch, offline jobs), and that kind of delay would not be acceptable to our customers.

The other challenge with this approach is that it didn’t initially seem like it could be done gradually. It would potentially require migrating (in a highly coordinated fashion) all the producers at once, and then some time after, all the consumers at once. Migrating this many producers and consumers at once was very risky, since if any of their pipelines were not working properly, it could delay the entire process and result in even more lag.

To address these concerns, we realized that we could break down the migration into several phases. Using the insights described in Analysis and Planning, we came up with the following phased approach:

  • Stage 0: Consumers that can handle out-of-order events
  • Stage 1: Producers producing only to consumers that handle out-of-order events
  • Stage 2: Services in self-contained subgraphs
  • Stage 3: Producers and consumers that cannot tolerate out-of-order events or high lag
  • Stage 4: Consumers that can tolerate a high amount of lag

Stage 0 consumers were the consumers which could handle out-of-order events. As a result, we could point them to both the old clusters and the new cluster at the same time. That way as soon as the producers moved over to the new cluster, these consumers would immediately receive the new events from the new cluster without any delay.
Immediately after the producers switched over, there would be a brief period where these consumers would be consuming both the remaining unconsumed events from the old clusters, as well as the events from the new cluster. While there would be no duplicates, the ordering between these 2 streams was not guaranteed. That is why this approach only worked for consumers which could handle out-of-order events.

To support consuming from both clusters at once, we started prerequisite enhancement work on our Scala client libraries. We made our configuration handling more flexible to allow us to specify sets of topics to consume from specific clusters. This was the first piece of new development work to support the migration process.

Stage 0 could be done as soon as the new cluster was up and running, and well before the main migration day. It turned out that a majority of our consumers could tolerate out-of-order events, so right away we had reduced the number of consumers we would need to touch on migration day by more than half!

Stage 1 led naturally from stage 0: migrating producers that produced only to topics that were consumed by phase 0 consumers. As such, it could also be done ahead of the main migration, as long as it was completed after stage 0.

Stage 2 was the final of the pre-migration day stages. It involved migrating all of the consumer and producer subgraphs which we had previously identified (described in Analysis and Planning) over to the new cluster.

Stage 3 involved migrating most of the remaining consumers and producers. These would be done all at once in one “big bang” migration. This was the highest risk stage, since it required the greatest amount of coordination.

Stage 4 consisted of a few consumers which we knew could tolerate a high amount of lag, and so could afford to delay until the end.

As a result of using these stages, the number of producers and consumers which would require a coordinated migration on the main migration day (stage 3) was significantly reduced, and the entire plan felt more manageable.

4. Migration Process

We first prepared a step-by-step plan to be executed in the staging environment. The plan was divided into pre-migration, migration, and post-migration sections. The pre-migration tasks included:

  • Setting up the infrastructure for the new Kafka cluster
  • Creating the same topic structure in the new cluster
  • Creating pull requests to change the Kafka configuration for each consumer/producer service and getting approvals for those changes
  • Ensuring all the tools we wanted to use for monitoring were set up correctly and accessible by the team. These included our Prometheus metrics, logs, CMAK (a Kafka monitoring tool) as well as direct access to the Kafka brokers themselves so that we could use the Kafka CLI tools, if needed.
  • Preparing new Grafana dashboards to be used during migration. These leveraged the Kafka server metrics exposed via JMX and focused on consumer group lag, incoming messages per topic and general health indicators like producer request rate and latency, which could indicate misconfiguration.
  • Making necessary library upgrades to the clients
  • Setting the “auto.offset.reset” config value to “earliest”
  • Making sure all the CI pipelines were working properly a day before the migration

After formulating the plan, we executed it in staging. We announced the migration date to the other teams ahead of time, and assigned each task in the detailed plan document to members of our team.

The actual migration consisted of the five stages described previously. In summary, they were:

  • Stage 0: Consumers that handle out-of-order events
  • Stage 1: Producers producing only to consumers that handle out-of-order events
  • Stage 2: Services in self-contained subgraphs
  • Stage 3: Producers and consumers that cannot tolerate out-of-order events or high lag
  • Stage 4: Consumers that can tolerate a high amount of lag

We created PRs for the stage 0 consumers to make them consume from both the old and new clusters at once. These were merged and deployed ahead of the migration date. Similarly, stage 1 and 2 PRs were merged and deployed ahead of the migration date. These earlier stages exposed a few minor bugs in our client libraries, which we were able to fix as we went.

On the day of the main migration, our team met virtually. We started a new thread in our migration slack channel and started a timer. The intention was to have a reference for the production migration later. We merged the Stage 3 PRs, first producers and later consumers, and after merging them ensured they deployed all the way to staging without any issues. We verified through our monitoring tools that each producer/consumer was emitting/consuming events to/from the new cluster. After each task was successfully completed, we checked it off in the planning document to avoid missing any steps. We then did the same with the Stage 4 services.

When we stopped the timer after the main migration, it had taken about 6 hours. Our team had a retrospective meeting to discuss how it went, how we could improve the process, and the action items for before the production migration. It was quickly apparent that:

  • The amount of PRs we had to deal with was too large
  • The manual Kafka configuration change for each service was too error-prone (typos were easy to miss)
  • CI pipelines could be flakey, and they slowed down the process
  • We initially missed some producers which were therefore not updated, which could have caused data loss or an outage if they had been missed in production
  • Monitoring so many moving parts at the same time was challenging
  • The process was smooth enough that we didn’t need to follow our disaster recovery process

While preparing for the production migration, we kept looking for ways to improve the cumbersome process of creating PRs for each service and depending on the CI pipelines to deploy the necessary changes. Our operations team suggested that we could leverage Kubernetes configmaps to help. If we created a configmap which stored Kafka broker strings and used it as an interface for services to read from, all we had to do was to run a simple script on the command line to update the current brokers, avoiding the need for many PRs and deployments. That would make the Kubernetes service migration a breeze compared to how we had done it in staging. Introducing the configmap key-value pairs for Kafka brokers was the most significant shift in our migration plan.

We generally followed the same approach as in staging to prepare for the production migration. We created a detailed migration plan document with pre-migration, migration, and post-migration phases. Pre-migration was similar to staging, but also included:

  • The action items we came up with during the retrospective meeting
  • Creating new key/value pairs for the Kafka broker configuration in a Kubernetes configmap
  • Creating a new set of PRs for services to read broker strings from the configmap, and deploying these changes ahead of time
  • Creating configuration change PRs for the non-k8s services in stage 3
  • Searching for newly created producers/consumers since the staging migration, and adding them to the correct stage of the production plan

We also had to make a few small changes to the order of the services we were going to migrate in production after the learnings from the staging migration.

Having all the pre-migration tasks checked off, stage 0, 1, and 2 migrations were done just like staging. When it was time for stage 3, we first ran the producers’ configmap switchover script. After verifying with our monitoring tools that all the producers were emitting to the new cluster and that the consumers had drained the events in the old clusters, we ran consumers’ configmap switchover script. The configmap switches were immediate, and had saved us the trouble of dealing with unstable CI pipelines. We luckily didn’t require the disaster recovery process for production either, and the process was much faster and less error-prone than staging. Overall the production migration process took approximately two and a half hours, over twice as fast as the original process we used in staging.

Immediately after the migration, we had a few action items to perform, such as:

  • Perform a diff of consumer groups on the old clusters and new clusters, ensure they were the same
  • Update our Jenkins job management pipelines with the new cluster and removing the old clusters
  • Update some of our monitoring tools to point to the new cluster and not to the old ones

After that, it was a matter of cleaning up any references to the old clusters one by one, and eventually decommissioning the old clusters themselves.

5. Outcomes, Learnings, and Next Steps

Stats

Consumer groups migrated: 125
Producers migrated: ~60
Topics: ~249
Downtime: none
Data loss: none
Kafka clusters: 4 down to 1
Zookeeper clusters: 2 down to 1
Messages in per day: ~260 million
Kafka client integrations: Scala, Go, PHP, Python, kafka-bridge (REST)
Topic tenancy: multiple
Savings: ~$44,000 annually

Traffic from the old cluster reducing to 0.

Traffic beginning to show up in the new cluster. Notice it starts with a trickle due to a few isolated producer/consumer pairs we migrated ahead of time.

By the end of the consolidation, we had successfully migrated 125 consumer groups and ~60 producers. There was no data loss, and minimal lag. All in all the migration went more smoothly than we could have hoped for.

Learnings

There were a number of key learnings from the consolidation process.

Service pipelines, in aggregate, are unreliable. Any consolidation strategy which depends on a large number of pipelines running successfully in a narrow window of time is doomed to fail. Finding ways to mitigate this by skipping pipelines altogether, such as our approach of updating the shared config-map state, is much safer.

Doing as much work ahead of the main consolidation day as possible greatly simplifies the migration process. In our case, we modified our consumers which could handle a small number of out-of-order events to consume from both the old and new clusters ahead of time, so that they were all essentially pre-migrated. On consolidation day, we had only about 20 remaining consumers to move over, greatly reducing our cognitive load.

Sometimes a 100% perfect rollback is not feasible. In our case, we tried for a long time to come up with a rollback plan that would not result in data loss, duplication, or significant lag. We investigated processes of mirroring data back to the old cluster as we went, but ultimately the added complexity added as much risk as it mitigated. Instead, we defined a point of no return at which we would fix forward instead of back. That way we had a “good enough” rollback plan that didn’t unduly increase the overall complexity of the project, and also made it completely clear what to do if a crisis happened.

Having services which both produce and consume from different topics at the same time is difficult to reason about during a consolidation. Ultimately treating such services as 2 separate clients (one producer, one consumer) made it easier, and allowed us to track down a few separate producer-consumer pairs which could be migrated ahead of the main consolidation.

Don’t underestimate the utility of a good spreadsheet.

Next steps

Having to deal with many different client libraries made this consolidation much more challenging. There were 2 different Scala consumer libraries (with several versions of each in use), Go clients, Python clients, and kafka-bridge (REST) clients. This greatly increased the complexity of the operation. One thing we will investigate is the option of consolidating to a single Kafka client interface, such as our kafka-bridge REST client.

Tracking producers is important. While our consumers are part of consumer groups which Kafka keeps track of, there is no equivalent for producers. As a result we had to take great care to manually track them, and in our staging migration we initially missed some. Going forward, we will investigate better ways to track our existing producers so that we can be confident in future migrations.

We are also planning to migrate to Amazon MSK. This will greatly reduce maintenance overhead.

--

--

Ken Ho
Hootsuite Engineering

I’m a senior software engineer on the Backend Platform team at hootsuite