How we moved 100 million chat messages with no downtime — Part 1

Stefan Irimescu
Beekeeper Technology Blog
8 min readOct 12, 2022

Upgrading a legacy chat system to a modern solution can be an exciting moment for your product but migrating historical data to the new system can be challenging especially when both systems are live for different groups of tenants. This is the story of how we did it with no downtime using Kafka, Debezium and MongooseIM.

System Overview. Why did we even do this?

Beekeeper introduced a new chats system powered by MongooseIM in early 2022. A brief introduction to the architecture of the 2 chat systems will be helpful for understanding the migration pipeline. For simplicity, we will refer to the old system as ChatsV1 and to the new one as ChatsV2.

ChatsV1 was based on a monolithic Python system, backed up by a Mysql DB and HTTP Long Polling for sending real time server events to the users as illustrated in Figure 1. With time, this approach proved to be hard to scale, and it was missing features that were considered industry standards in a chat solution.

There are 4 tables relevant for the migration:

  • conversation
  • conversation_read
  • message
  • conversation_member
Figure 1 — ChatsV1 Architecture

ChatsV2 is powered by MongooseIM¹ , which is a proven open source system that can scale up to our load requirements. One of the main selling points is that it enables our web/mobile clients to communicate via XMPP and web socket channels, allowing us to deprecare our old long pooling solution.

However, due to specific functional requirements in group chats, we have decided to implement additional functionality that manages group chats in a custom microservice written in Quarkus. Both MongooseIM and the Beekeeper chats service have their own distinct Postgres database servers, thus allowing us to be more resilient.

For other systems interested in what happens in chats, we are streaming events using the outbox pattern² to an Apache Kafka³ event bus via Debezium3 connectors as illustrated in Figure 2.

Figure 2 — Chats V2 Architecture

Moving on from the details of the to system, to the initial rollout, we started by onboarding new tenants directly onto the new system. As for older tenants, they have a large variety of requirements, including custom tooling, thus requiring different timelines. Because of this, several factors had to be considered when deciding on a strategy for the chat data migration:

  • Customer Experience. The migration process cannot significantly impact the customer experience of our chat platform.
  • Implementation Effort. The amount of engineering time required to build up the migration process and tools for the migration process. The migration should be optimized to reduce the implementation effort for the engineering teams, so that the engineering team can focus on building out new features instead of the migration process.
  • Migration Effort. The amount of effort required to execute on the migration process with the tools provided by the engineering effort. The migration strategy should be optimized to be as scalable as possible, reducing the effort required for our teams to execute upon the process.
  • Migration Risk. How much risk do we carry when moving people onto the new chat engine?

In order to balance all these criteria, we chose to do a live data migration pipeline. This option entails that all the historical data is moved to the new system ahead of time and new data will be streamed live. Although this comes with a slightly higher implementation effort than simply scripting the migration and running it tenant by tenant, it has many advantages:

  • The customers will not see the new system until they are switched over. All the data is already there, we can simply turn on a flag during off-hours with no downtime.
  • We have the opportunity to validate data correctness and completeness ahead of time.
  • No engineering effort is required when switching over a customer to the new system. Thus, it also allows us to spread the customer upgrade process across time with minimum effort besides maintaining the processing pipeline alive.

This allowed us to stretch the ChatsV2 rollout over 4 months in 8 batches and greatly reduce the risks associated with switching a massive amount of customers at once to a new system.

Part 1: Pulling historical data out with CDC

On a more technical side, the first part of this article tries to answer the following questions:

  • How do you get 100 million rows out of a MySQL DB into separate messages into Kafka?
  • How do we keep streaming live changes into the Kafka topic?
  • How do you do both of the above without hurting the DB performance?

Too many questions at once, let’s see what our options are.

The most straightforward way would be to run simple selects on the 4 tables we are interested in and manually write the events into Kafka. We would need to have two operating modes, one that would get the historical data, and another more for streaming data, where we would check the DB for new queries every few seconds. Easy, right? Not really as this approach has a couple of problems:

  1. Running the select locks the rows we are currently reading, thus, affecting the performance of the old Chats system.
  2. Furthermore, during the snapshot phase, reading 100M rows from the DB would certainly use most of the available IOPS that we have available in AWS RDS .
  3. What about changes? Do we have an updated_at column we can filter by on all tables? We need to make sure we pick up all the new changes from the DB. Furthemore, since the old implementation is in a monolithic system, with no clear boundaries between modules, changing the code to send events into Kafka is not an option, as it would be very easy to miss an execution path.
  4. What about ordering? Can we handle a message_created event before the corresponding conversation_created event is consumed? We certainly could, but this would complicate the logic.

With this in mind, we chose to go a different route and use a CDC (Change Data Capture) approach, using Debezium³. As described on their blog⁴, this allows us to solve the following problems:

  • Never miss data. The old chats codebase is distributed across many parts of the monolith codebase, thus trying to capture all places where interactions with the chat service are made is prone to missing events. With CDC, you are certain that all changes from the DB are captured, no matter which system made those changes.
  • No performance impact on the existing system. By reading the replication log, there are no delays added in the already slow functionality of Chats1.0.
  • No loss of data. The need to send real time data to another system is complex and prone to loss of data. There are no easy ways to enforce atomicity when data needs to be sent to two systems. With Debezium we ensure all the data is processed in an eventual consistency fashion.
  • Streaming and snapshot data can be treated equally. There is no difference in data model between the two.

Implementing the pipeline for getting data out of ChatsV1 using CDC is relatively straightforward. The high level architecture is presented in Figure 3. A legacy database read-replica is required for Debezium’s original snapshot to be created without downtime since the operation requires a global lock on the 4 target database tables. The Debezium connector can then be deployed to a Kafka Connect cluster and configured to push the data into the target Kafka topic. It will start with the historical data snapshot and then continue with any real-time events. From there it will be consumed by the chats-migration-bridge Quarkus microservice. This service will also need to pull in some auxiliary data such as user avatars and formatted display names from the database read-replica.

Figure 3 — Pulling data out of ChatsV1

Actually running the pipeline

Our product runs in multiple data-centers across various regions, powered by AWS and GCP. We ran the migration one data-center at a time. We have a microservices-oriented stack that runs inside Kubernetes, infrastructure is managed via Terraform. Our RDS instances were already configured to allow for CDC⁵.

For AWS the deployment process was quite straightforward:

  1. Deploy the Kafka topic and configuration
  2. Deploy the MySQL Read Replica
  3. Start the snapshot of the historical data.

Debezium is quite good with snapshotting historical data, averaging at around 2.2 million entities per minute for our infrastructure. Table 1 shows the results for 2 of our data-centers.

Table 1

After the snapshot is completed the connector continues to stream any real time events that may occur. For some test tenants we actually ran both chats systems in parallel and messages from ChatsV1 would be migrated in real-time to ChatsV2.

Conclusion and Learnings

This concludes the first part of this series. To recap, at this moment in time we have moved 100+ million rows from a MySQL database as messages into Kafka, without downtime. Furthemore, we stream all the changes that happen in real-time, while preserving the order of the events in a way that makes sense in the domain.

There were some interesting learnings along the way:

  1. You cannot use a read-replica for CDC on Google Cloud because unlike AWS, binlogs are not available for read replicas⁶. The solution is to either use the master DB with a maintenance window or run a manually managed read replica.
  2. Debezium had a bug in the snapshot procedure, changing the order of the tables in the case there is a partial match in the names of the tables. We have fixed this and used a custom Debezium jar in order to move fast, while also contributing to open source with a fix

Once the snapshot is completed and the data is in the migration Kafka topic, the actual migration can start, but this will be the topic of Part 2, where we will continue our journey processing all these messages and moving them into the new system. Stay tuned.

Andrei Isac, Stefan Irimescu — Beekeeper AG

Part 2 in now available.

References

  1. MongooseIM
  2. Outbox Pattern/Debezium
  3. Apache Kafka
  4. Debezium CDC
  5. AWS RDS CDC
  6. GCP RR binglog

--

--