Add Some Smarts To Your Change Data Capture

Nowadays it feels so natural to write micro-services, and companies are breaking down their monoliths to micro-services. They can easily have hundreds or even thousands of micro-services to manage, and inadvertently they are facing the spaghetti integration.

As the number of your services grows, it becomes error prone and hard to manage. You might find yourself in the service dependency hell or even worse: a service becoming a single point of failure. Fortunately, event bus is always an option: build services on a backbone of events.

There are many ways to emit events. Change Data Capture (CDC) may not be the most obvious one but it can be very effective in some cases. Zendesk has an open source CDC project for MySQL called Maxwell. It reads and parses MySQL binary logs into messages. For each data change, it publishes a JSON message to Apache Kafka (or AWS Kinesis, or many others). Typical Maxwell messages look like the following:

Maxwell messages

For each INSERT, UPDATE and DELETE statement that occurs in the database, Maxwell can capture and publish it. It is cool that you can turn the database inside out, isn’t it? There is a lot you can do with these data change events down to the SQL statement level, e.g. building secondary indexes or caching, especially with the power of Maxwell to blacklist/whitelist databases and tables. But what if you have a bunch of consumers who are keen to consume domain-aware events, and are not particularly familiar with the database structure?

At Zendesk, we’ve built a service called Maxwell Smarts to deal with this concern. Looking closer at the Maxwell messages above, you are usually able to infer what each database transaction has done. For instance, you can emit a UserCreated event when seeing “INSERT into users…” statement or UserUpdated event for “UPDATE users…”. Emitting events based on a database transaction gives you ACID for free, ensuring you process events in the order they occurred. This is what Smarts does.

Although Maxwell supports a few different partitioning strategies, we configure it to be partitioned by database name so that every change for a particular database is sent to the same partition.

Maxwell publishes every change for a database to the same Kafka partition

Smarts consumes the Maxwell topic and rolls up the database transactions by transaction id (xid).

Transaction rollup function in Smarts

The input of the rollup function is a list of messages from the Maxwell topic which outputs a list of completed transactions. We use xid and commit flag to determine whether a transaction is completed. For example, when seeing the “db=db1, xid=1, commit=true” message from the input, we mark the transaction completed. However we can’t mark the transaction xid=3 completed because we don’t see either the commit flag or the beginning of a new transaction indicated by the xid change from the same database. It will most likely be completed in the next batch from the Maxwell topic. Partial transaction messages (in this case the “db=db1, xid=3” message) form the state that we have to maintain in memory.

Event generator in Smarts

Given a transaction, the event generator outputs domain-aware data change events which it extracts from the database statements within the transaction. We use Protocol Buffers for the message schemas. Those domain-aware events will be published to additional Kafka topics, e.g. account topic, user topic, etc.

Smarts currently provides “at least once” semantics. This relies on Maxwell topic offsets being correctly committed to Kafka. For instance, only when the events generated from the transaction xid=1 are delivered and acknowledged by Kafka, the offset of the message “db=db1, xid=1, commit=true” will be committed. In this way, even if the Smarts process crashes during any of the above stages, the transaction xid=1 won’t be lost and will be pulled again after the restart of Smarts.

Reliability and performance are always critical in such services. We use Akka Streams which handles back-pressure out of the box. Configuring Kafka for use with Smarts requires careful configuration to balance the competing needs of reliability and performance. Just to list a few, you can trade off reliability against throughput through the producer’s “acks” setting. It can be more reliable if the producer’s retry duration covers the configured timeout for some types of the broker errors. The consumer’s “max.poll.records”, “fetch.min.bytes” and “” settings can impact the throughput and efficiency. If event order is important, you should configure your Kafka producer appropriately, see “in.flight.requests.per.session” setting.

Maxwell Smarts takes CDC one step further. But it isn’t a silver bullet. You will need to make sure your processor is ready to handle any database schema change you make. With a shared database, it might not be obvious who’s done what to it, which makes it harder to extract the domain-aware events out of it. But at least we like the Smarts it brings :)