Streaming Data out of the Monolith: Building a Highly Reliable CDC Stack
The software architecture of BlaBlaCar is being challenged by its growth and team organization. Its backend was historically a PHP monolith with hundreds of async workers around a RabbitMQ broker and a bunch of scheduled cron jobs. In order to scale, starting from early 2017, we decided to upgrade this backbone: migrate the monolith to a Service Oriented Architecture (SOA).
During the migration, the monolith and the SOA must co-exist in production. The monolith code is carefully being removed piece by piece until it will be completely replaced by services. Meanwhile, the business obviously needs to keep running, new features and fixes need to be implemented continuously. The whole process feels like replacing the engines of a flying aircraft.
However, how can we guarantee the data consistency between the monolith and the services? How do we know when the services behave in the same way as the monolith? When do we know whether a service is ready to be relied upon and the related code and data in the monolith can be trashed?
In order to answer all these questions, we built a reliable Change Data Capture (CDC) stack, allowing services to observe all the database changes of the monolith MySQL database. The CDC data can be considered as a source of truth so that the services can compare the data with their own and report metrics if there’s any inconsistency and eventually, correct the buggy data with the truth. Moreover, reliable CDC data can be used to simplify our current backend data tracking stack and reduce friction between developing new features and collecting data for Business Intelligence.
In this post, we will explain how this CDC stack was built and focus more specifically on how we made it highly reliable.
CDC and Debezium
Change Data Capture is a software design pattern which records INSERTs, UPDATEs and DELETEs applied to database tables, and makes a record available of what changed, where, and when.
Debezium is an open source project that provides a low latency data streaming platform for change data capture (CDC).
It’s a Kafka source connector that streams the changes from database to kafka topics. It supports MySQL, MongoDB, PostgreSQL and Oracle.
With Confluent’s avro converter and the schema registry, the CDC data can be encoded into avro format out of box and it eases the services or other kafka-connectors behind who consume those data.
An example of data change captured by debezium:
We deployed into production pretty quickly a simple CDC stack to prove the concept. The teams quickly reached a consensus that this stack could be useful for both SOA migration and the simplification of backend data tracking.
However, one of the biggest challenges we had was its fragility: when either Debezium or the MySQL slave goes down, the CDC stream would simply stop. Meanwhile, Debezium guarantees at-least-once delivery. Upon Debezium restarts or Kafka producer retries, data would be duplicated.
WePay posted an article in 2017 (Streaming databases in realtime with MySQL, Debezium, and Kafka) where they talk about how they solved this availability issue with HAProxy and MySQL GTID that eased recovery after a MySQL source goes down. Unfortunately, this approach wasn’t applicable in our case:
- We are using MariaDB in which GTID is implemented in a different way than MySQL, and they are not compatible with each other.
- Asynchronous replicas management is very static and cannot be hidden behind the HAProxy.
Thus, the proposition was quickly discarded by the team and we had to look for another solution.
In order to avoid over-engineering the source failover, we decided to duplicate the source part of the pipeline: 2 Debezium connectors sending the same data in parallel, allowing us to restart or loose one source without impacting the overall stream of CDC . We then developed a deduplicator service that consumes the data of these 2 connectors and calculates a hash key for deduplication:
While consuming the messages, the deduplicator calculates the key and checks if it’s already stored in its key-value store. If not it saves the data and sends it to another kafka topic (same topic name as before but with a prefix), otherwise it will simply skip the message.
As a result, the output of the deduplicator behaves as the stream of a simple debezium connector.
Deduplicator key-value store
The deduplicator is a simple service written in Java with Spring Boot, a framework widely used by the services at BlaBlaCar. In order to prove the concept quickly, we deployed in production a single deduplicator instance with RocksDB, an embedded key-value store which only requires a declaration of the library in the code and doesn’t need to build a separate container image. After the concept was validated, we noticed that RocksDB did not fit our case:
- The deduplicator needed to persist the data locally, which meant that the instances had to be deployed on the same machine upon update or restart, whereas BlaBlaCar backends are fully containerized.
- The topic assignment was highly coupled with deduplicator instances because instances running on different machines did not share their physical storage. We needed to segment the topics to ensure that the same topics were always consumed by the same instance. If an instance died, the related topics simply stopped being consumed. This violated our high availability intention.
Redis seemed to be a good candidate, but not a wise option. The CDC stack generates massive traffic and storing them in memory means having a much shorter TTL (Time-to-live) for deduplication. The TTL is related to the maximum downtime of a single Debezium connector. Our target was around 2 days, which represents more than one hundred GB of data and requires a huge Redis cluster.
We finally decided to use Cassandra for its high availability, high performance, capacity of storing massive data and native support for TTL. Note that the deduplicator writes and reads very fast in the storage, in order to avoid deduplication failure, it’s required to have a strong consistency level (LOCAL_QUORUM level) in Cassandra. This might sound bad for performance, but it turned out to be acceptable on production.
Unlike RocksDB, Cassandra can be considered as a central key-value store and the deduplicator doesn’t need to implement specific topics assignment logic. If one deduplicator instance fails, the topics will be rebalanced and automatically reassigned to the other alive instances.
The deduplicator implements a consume-process-produce pattern. Starting with version 0.11.x, kafka supports exactly-once semantics using transactions. The consumer commits the offsets only when the records are successfully processed.
The output of the deduplicator produces a CDC stream that guarantees both exactly-once semantics and message ordering.
We monitor every components of the stack to detect and track the hot spots. Some standard metrics from MySQL, Kafka and Cassandra are already exported to our Prometheus. Meantime, the deduplicator reports a lot of valuable custom metrics. Here are some of the most important ones:
- The data volume per second of each part of the pipeline.
- The end-to-end latency of the whole stack (duration from the record being processed by debezium to the record being ready to be consumed).
- Cumulated delta between the amount of data being stored and the amount of data being sent successfully, which allows us to observe if there’s any data loss.
- Amount of create/update/delete operations per table. This metric is very interesting for the service migration as it’s an important indicator from which we determine if a table can be dropped from the monolith DB.
With these metrics, it’s very easy to observe whether the stack is in a good health and which parts are not.
Introducing more components in the pipeline also means adding more latency to the messages. The end to end latency in our production is about 500 ~ 800 ms, which is fairly acceptable for the services.
Another performance issue is the topics’ single partition, which is important for message ordering. The deduplicator instances cannot parallelize the ingestion of a single topic. This could introduce more latency when there are massive operations on one single table. For example, the data archiving operations that we execute on production generates millions of deletes statements in a short period. Topic partitioning or table sharding might need to be taken into consideration for some tables with intense activities (we haven’t tackled this point yet). Currently we don’t really need to deal with the scalability issue because the monolith database shrinks as the services grow.
The “source team” who manages the databases can operate the slaves and the connectors without worrying about stopping the service of the stack. When one of the Debezium connector crashes, the team doesn’t need to dive into details to investigate. The broken connector can simply be restarted. If the restart fails, we can create a new one with a new Kafka group id. The deduplicator instances can be rolling restarted without impacting the service.
As soon as there is at least one functioning Debezium connector and several alive deduplicator instances, the whole stack remains available. Any components in the stack going down doesn’t have any impact on the overall availability of the stream. Feel relaxed, don’t panic.
The deduplication trick sounds exotic for making a highly available stack. But it solves the problem at hand without having to add static management of asynchronous replications which could not be hidden behind HAProxy. The stack operation in turn becomes much simpler. We also have a lot more metrics and visibility over the pipeline. Last but not least, it guarantees exactly-once semantics and message ordering with acceptable end-to-end latency. The stack is working and we haven’t experienced any downtime since it has been on production.
We plan to post more about our use cases of CDC. Please stay tuned.