Achieving real-time analytics via change data capture

Ofir Sharony
MyHeritage Engineering
5 min readJan 29, 2018

--

In a previous post, we described the MyHeritage event processing pipeline, which delivers billions of daily events to data analysis. After digesting some of your responses, it became clear that we need to add one missing piece to the puzzle: how do we use database record changes for analytical purposes? How do we connect every DML operation to its specific user context?

Imagine you have a fully operational system, and you want to create an event-driven pipeline on top of it. If you have seen the future from day one, and designed a decoupled system with events triggered for all user operations — then you have great starting point, as each event can reflect the data before and after its occurrence. But what if you simply don’t have the events in place?

A naive solution for this problem would be to ask every developer to go back to any feature ever developed, and distribute an event exactly in the right place. You see why this solution is not realistic — not only could it take months of work, but that work may be error-prone. A Sisyphean task, which would not be appreciated by developers, to say the least.

So what’s the alternative? Change-data-capture (CDC) came to the rescue. CDC can be viewed as a design pattern for identifying and collecting database record changes. Every DML operation in your database is captured, and that allows you to seize any insert, update, or delete operation and use them for your own purposes.

It’s recommended to deliver database record changes to a streaming platform of your choice — Apache Kafka, in our case. You may use one of the many Kafka Connectors to continuously sink data from a database to a Kafka topic. Connectors for relational and NoSQL databases can also be used to identify record changes, and in turn send their details to Kafka.

We will demonstrate how to bind database record changes to user contexts in MySQL. We decided to use Maxwell, an open-sourced application that subscribes to MySQL binary logs, those record change events that are used for database replication. Maxwell acts as another database slave, reads the binary logs, and ships them to Kafka. For example, when performing the following update command, the message below will be produced to Kafka:

UPDATE users
SET occupation = “CDC specialist”
WHERE id = 125;

So what do we have here? First, basic information about the record change, such as the database name, table name, and timestamp of the operation. Second, we can easily infer the value of all record fields before and after the change. And third, a MySQL thread id, which identifies the database connection of this operation. Remember this one — we’ll get back to it.

Now we have all database changes in our streaming platform, but how can we perform analysis with a plain database record change? As you see in the json above, there is no user context here — nothing that gives us even a clue about the associated request, session, or user information.
You have the option of adding a request id column for thousands of database tables, but do you really want to go there? A better approach would be to use stream processing to bind a record change to user context in near real-time.

Let’s start connecting the dots. When a request arrives at our web server, we immediately collect its context details: user information, request headers, canonical url, session params, user ip, etc. We generate a unique request id for this specific context, and produce the data to a Kafka topic:

Within the context of this request, multiple database changes are performed. Each DML operation generates a binary log that is observed by Maxwell and produces a record to a Kafka topic. Now, a connection is required, or we wouldn’t be able to bind a request with its specific database changes. We decided to employ a trick here: we created a new database table that maps a request to the MySQL threads that have been used throughout the request:

With thousands of connections and billions of DML operations per day, how can we avoid overwhelming growth in the size of this table? The answer is simple: we don’t store this data! We could truncate this table at regular intervals, but a cleaner approach is to use MySQL’s Blackhole engine. Adding a record to a black-hole table will not persist the data, leaving the table empty, and it will log and replicate the statement to slave servers. That is really all we need here — the above mapping will arrive at the binary logs, and will be sent by Maxwell to Kafka. Here’s an illustration that sums up what we have so far:

All right, we have collected all the data we need. We have request information in request_data topic, and all database record changes in cdc_events topic. Now comes the cool part — using stream processing to join all pieces together.

First, we connect a DML operation to its request id. You can do this by performing inner-join within our cdc_events topic. As you recall, this topic includes database record changes, along with the binding of requests to database connections. By applying a simple join in your favorite stream processing framework, you can complete this task easily:

After this join is made, our result is a database record change connected to the related request id, produced to a Kafka topic called cdc_with_request_id:

Now, all we need to do is to fill in the request details. That’s the fun part! We consume the output topic from the previous step, and join it with the topic that stores request information. Here’s what that looks like:

And this is the final, joined record:

There you go — change-data-capture events are now connected to user context in near real-time.
All you have to do now is analyze the data :)

--

--

Ofir Sharony
MyHeritage Engineering

BackEnd platform lead @MyHeritage; ApacheKafkaIL community manager (2000 members); Creator; Writer; Speaker; https://linktr.ee/ofirsharony