CDC to Lakehouse using Debezium. Part One

Ravikanth Musti
6 min readOct 28, 2022

--

I have been asked by many customers and colleagues on different ways they can do CDC (Change Data Capture) using open-source tools. While there might be many in the market, I am particularly interested to explore Debezium CDC as many startups across the globe use it and in fact, they have mentioned a list of customers who use Debezium. Check out the list

This is first part of the blog wherein we will explore how to use Debezium for CDC from on-premises MySQL database to Lakehouse. Below is the architecture for reference.

Reference Architecture

I am using the following Azure Components in my architecture:

  1. Azure Event Hub to hold CDC events. One could use Apache Kafka as well. How to setup Azure Event hub here

2. Azure Databricks for Stream and Batch Processing using Spark.

3. Azure Data Lake storage Gen2 for storing all the data

I am following the tutorial mentioned in Debezium documentation to setup MySQL Database. However, for setting up Debezium, I will use Debezium server instead of using Kafka connect just to make it simple and get going with the functionality faster. For Production workloads I recommend you to setup Debezium on Kafka Connect using Kubernetes clusters.

Setup MySQL database

So, let’s start with setting up MySQL database with some sample data in it. I will be using my Docker Setup already installed in my Windows PC.

Open a new terminal and run below statement.

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.0

The above statement will install MySQL database server in a container with a preconfigured ‘inventory’ database.

You may ask what pre-configuration is required? To know that I suggest you follow the documentation mentioned in Debezium documentation for MySQL. Summarizing what is mentioned in the document, one needs to create a user account with appropriate permissions and understand how to enable MySQL binlog.

If you are using some other database like PostgresSQL, Oracle, Db2, SQL Server, MongoDB or Cassandra then it is important that you follow the pre-configuration required for respective databases without which you will not be able to read the transaction logs.

Once your MySQL database is up and running, it's time to connect and look at what data it has. So, open a new terminal and run following command

docker exec -it mysql bashexec mysql -h 0.0.0.0 -uroot -pdebezium
MySQL Tables and Data

Setup Debezium

There are many ways to set up Debezium but I will use a very simple and straightforward way to setup. For Production, I recommend that you setup on Kubernetes cluster. Also, if you are using Apache Kafka for streaming, it is recommended to deploy the Debezium connectors via Kafka Connect. For demonstration purpose I will setup Debezium Server as I am streaming into Azure EventHub. Refer this documentation to know more about it.

I found a very illustrative blog written by Evandro Muchinski on how to setup a Debezium Server. Follow the steps mentioned in the blog or follow the steps below which I have copied from the blog itself.

  1. Create Azure Event Hub namespace and Event Hub to hold CDC logs. Follow this quickstart to setup Azure EventHub

2. Create two folders in your local setup: conf and Data

conf : to holds the configuration file.

data : to store the status of Debezium.

Create an application.properties file and store it under conf folder. Below is a sample file for reference.

To know more about the configuration settings please refer this documentation

Next, Open Powershell and run below statement

docker run -it — name debezium -p 8080:8080 -v $PWD/conf:/debezium/conf -v $PWD/data:/debezium/data — link mysql debezium/server

Make sure you link your MySQL container to Debezium server.

That’s it. Your Debezium server with MySQL connector is ready to stream CDC logs to Azure EventHubs

Setup Spark Jobs for Structured Streaming

Before we get started with Spark jobs, make sure that you have completed pre-requisites

1. Created Azure Data Lake storage Gen2 account.

2. Created Azure Databricks workspace and a cluster:

3. Attached Azure EventHub libraries to the cluster to setup Spark Event Hubs connector. Follow this documentation for this requirement

Once you have completed the pre-requisites, lets start writing some code for Spark Structured streaming.

Step 1: Open a new notebook and create configuration to connect to the Azure EventHub that is listening to Debezium Server and holding the CDC logs.

Step 2: Create a streaming dataframe to receive the CDC logs. Azure EventHub attaches lot of other information such as offset, sequenceNumber, partitionKey etc along with the actual data. Here, we are interested in using only the main message which is stored in ‘body’ field.

Step 3: Now let's start steaming by using display command. Also, now we can start making some changes in MySQL database to see what is captured by Spark Streaming Job.

We will update a single row in the customer table

UPDATE customers SET first_name='Anna Marie' WHERE id=1004

After updating a record in MySQL, let’s see how it looks in Spark Output

Spark Output

As you can see above, the JSON message is very verbose, and it is difficult to make sense of it. This is where you need to understand how Debezium generates change data messages. Each message comprises of three parts:

1. Metadata : includes operation details like insert, update , delete and timestamp details

2. Row data before changes

3. Row data after change

Below is the structure of the message

Debezium Message Stucture

Most of the target systems would need only ‘after’ data and some metadata information. So, we need some event flattening technique to make messages easily consumable for target systems. For this, Debezium provides event flattening single message transformation (SMT). More information on this here.

To use Debezium SMT, we need to change our application.properties file which we created earlier and add few lines at the bottom of the file as mentioned below.

debezium.source.transforms=unwrapdebezium.source.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordStatedebezium.source.transforms.unwrap.add.fields=op,table,source.ts_msdebezium.source.transforms.unwrap.delete.handling.mode=rewritedebezium.source.key.converter.schemas.enable=falsedebezium.source.value.converter.schemas.enable=false

With new updated application.properties file in place, now re-run the update statement in MySQL to see how SMT works on the CDC messages

Updated Messages in required format

Now we get the messages in the format we are looking for.

Step 4: As the CDC logs are in Json format, we will parse it to individual columns. Let's use display command to see how our messages are parsed.

Json message parsed
The output is in desired format

As you can see, each message has captured ‘after’ data along with some other details like:

_op : Operation name like ‘u’ for update , ‘c’ for create , ‘d’ for delete

_source_ts_ms : timestamp at which the operation was done. This is very important and we will see how this column can help us in structured streaming

_deleted : a flag to indicate whether the row is deleted or not

_table: to indicate which table on which the operation was done.

In Debezium we have an option to send updates on all tables to a single EventHub or create a separate EventHub for each table.

Step 5: Finally, we will store the CDC logs in Delta Lake format in our Azure Data Lake Storage Gen2.

With this I conclude my first part of my blog wherein we have captured CDC logs from MySQL database to Lakehouse created on Azure Data Lake Storage using Spark Structured streaming. But from here the fun start as we go deeper into Spark Structured Streaming in our next blog. We will see how ‘foreachBatch’ along with ‘Merge’ feature of Delta Lake comes to our rescue. We will also check some of the cool features of Azure Databricks Delta Live Tables and CDC using ‘Apply Changes’ can help us achieve the same functionality in a better way.

--

--