RealTime CDC From MySQL Using AWS MSK With Debezium

Bhuvanesh
Bhuvanesh
Dec 22, 2019 · 8 min read
Credit: AWS

CDC is becoming more popular nowadays. Many organizations that want to build a realtime/near realtime data pipe and reports are using the CDC as a backbone to powering their real-time reports. Debezium is an opensource product from RedHat and it supports multiple databases (both SQL and NoSQL). Apache Kafka is the core of this.

Managing and scaling Kafka clusters is not easy for everyone. If you are using AWS for your infra then let AWS manage the cluster. AWS has MSK is a managed Kafka service. We are going to configure Debezium with AWS MSK.

Configuration File:

If you are already worked with AWS MSK, then you might be familiar with this configuration file. This is similar to the RDS Parameter group but here you need to upload a with your parameter name and its value. If you are using MSK for the first time, then it’ll make you a bit confused. No worries, I’ll give you the steps to do this. You can change this configuration file even after you launched the cluster. But it's a good practice to create the configuration file before launching the cluster. Unfortunately AWS CLI is the only way to create the configuration file.

1. Create a conf file:

create a file in your Desktop or somewhere with the following parameters. For Debezium auto-create topic parameter is required. So I’ll use only this one for now. You can add more parameters if you want. Copy and Paste the below content to your conf file called kafka-custom-conf

If you didn’t enable the auto topic creation, then you’ll see the following error.

Dec 20 18:32:19 ip-172-31-44-220 connect-distributed[23563]: [2019-12-20 18:32:19,845] WARN [Producer clientId=connector-producer-mysql-connector-db01-0] Error while fetching metadata with correlation id 294 : {mysql-db01=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:1051)

kafka-custom-conf file:

auto.create.topics.enable = true
zookeeper.connection.timeout.ms = 1000

2. Upload the conf file to AWS

Install AWS CLI on your workstation and run the following command.

NOTE: Im going to use Kafka Version 2.3.1, if you are going to use different Kafka version then change the --kafka-versions value.

aws kafka create-configuration --name "kafka-231-custom-conf" --description "Example configuration description." --kafka-versions "2.3.1" --server-properties file://C:\Users\rbhuv\Desktop\kafka-custom-conf

Once you ran the command it’ll give you the following output.

{
"Arn": "arn:aws:kafka:us-east-1:0000111222333:configuration/kafka-231-custom-conf/6061ca2d-10b7-46c6-81c0-7fae1b208452-7",
"CreationTime": "2019-12-20T18:38:17.103000+00:00",
"LatestRevision": {
"CreationTime": "2019-12-20T18:38:17.103000+00:00",
"Description": "Example configuration description.",
"Revision": 1
},
"Name": "kafka-231-custom-conf"
}

3. (Optional) Update existing cluster with this conf file

If you are going to create a new cluster then ignore this step.

Note: if you forgot to take a note of the configuration ARN(from step2), then you can get it from cli aws kafka list-configurations

You need the version for Kafka(its, not Kafka software version, its giving to your cluster by AWS). Run aws kafka list-clusters this will give you the value for a current version like this “CurrentVersion”: “K2EUQ1WTGCTBG2”

Create a configuration info file called configuration-info.jsonwhich contains the ARN of your new conf file.

{
"Arn": "arn:aws:kafka:us-east-1:0000111222333:configuration/kafka-231-custom-conf/6061ca2d-10b7-46c6-81c0-7fae1b208452-7",
"Revision": 1
}

Now run the below command to update your Kafka cluster configuration file with the new file.

aws kafka update-cluster-configuration --cluster-arn  "arn:aws:kafka:us-east-1:0000111222333:cluster/searce-bigdata/599c6202-ec40-455a-afa8-d7c5916d7bc2-7" --configuration-info file://C:\Users\rbhuv\Desktop\configuration-info.json   --current-version "K2EUQ1WTGCTBG2"

This will give you the following output.

{
"ClusterArn": "arn:aws:kafka:us-east-1:0000111222333:cluster/searce-bigdata/599c6202-ec40-455a-afa8-d7c5916d7bc2-7",
"ClusterOperationArn": "arn:aws:kafka:us-east-1:0000111222333:cluster-operation/searce-bigdata/599c6202-ec40-455a-afa8-d7c5916d7bc2-7/519396ad-1df2-46aa-8858-ba2c49f06c3c"
}

Launching the AWS MSK Cluster:

This MSK launch console is very easy and you can select the options as you need. I'm just giving you a few options where you need to focus.

Setup Debezium MySQL Connector on EC2:

Install Java and Confluent Connector binaries:

apt-get update 
sudo apt-get install default-jre
wget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main"
sudo apt-get update && sudo apt-get install confluent-hub-client confluent-common confluent-kafka-2.12

Configure the Distributed connector properties:

Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,290] WARN [Producer clientId=producer-3] Got error produce response with correlation id 844 on topic-partition connect-configs-0, retrying (2147482809 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)

/etc/kafka/connect-distributed.properties file:

vi  /etc/kafka/connect-distributed.propertiesbootstrap.servers=b-1.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092,b-3.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092,b-2.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092
group.id=debezium-cluster
offset.storage.replication.factor=2
config.storage.replication.factor=2
status.storage.replication.factor=2
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

Install Debezium MySQL connector and S3 connector:

confluent-hub install debezium/debezium-connector-mysql:latest
confluent-hub install confluentinc/kafka-connect-s3:latest

Start the connector service:

You can run your confluent connector application via systemctl.

vi /lib/systemd/system/confluent-connect-distributed.service[Unit]
Description=Apache Kafka - connect-distributed
Documentation=http://docs.confluent.io/
After=network.target
[Service]
Type=simple
User=cp-kafka
Group=confluent
ExecStart=/usr/bin/connect-distributed /etc/kafka/connect-distributed.properties
TimeoutStopSec=180
Restart=no
[Install]
WantedBy=multi-user.target

Start the service

systemctl enable confluent-connect-distributed
systemctl start confluent-connect-distributed

Configure MySQL Connector:

Note: Before configuring MySQL connector, make sure you have enabled binlog and the MySQL port should be accessible from the Debezium EC2. Also a MySQL User with respective permissions. (refer the Debezium docs).

Create a file mysql.json (this is my example conf file, you can refer Debezium docs for the meaning of these parameters)

Note: From line number 14 onwards I have added some filters to bring only the new data from MySQL to my consumer app. By default, Debezium adds some metadata info along with the MySQL Data, but I don’t want them) and make sure bootstrap servers and MySQL credentials are correct.

{
"name": "mysql-connector-db01",
"config": {
"name": "mysql-connector-db01",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.id": "1",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "b-1.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092,b-3.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092,b-2.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092",
"database.history.kafka.topic": "schema-changes.mysql",
"database.server.name": "mysql-db01",
"database.hostname": "172.31.84.129",
"database.port": "3306",
"database.user": "bhuvi",
"database.password": "your_strong_pass",
"database.whitelist": "bhuvi,new,test",
"internal.key.converter.schemas.enable": "false",
"transforms.unwrap.add.source.fields": "ts_ms",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}

If you don’t want to customize anything then just remove everything after the database.whitelist

Register the MySQL Connector:

curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.json

Check the status:

curl GET localhost:8083/connectors/mysql-connector-db01/status {
"name": "mysql-connector-db01",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.44.151:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.44.151:8083"
}
],
"type": "source"
}

Test the MySQL Consumer:

Now insert something into any tables in proddb or test (because we have whilelisted only these databases to capture the CDC.

use test; 
create table rohi (
id int,
fn varchar(10),
ln varchar(10),
phone int
);
insert into rohi values (2, 'rohit', 'ayare','87611');

We can get these values from the Kafker brokers. Listen to the below topic:

mysql-db01.test.rohi
This is the combination of
servername.databasename.tablename
servername(you mentioned this in as a server name in mysql json file).

kafka-console-consumer --bootstrap-server b-1.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092,b-3.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092,b-2.searce-bigdata.XXXXXXXXX.kafka.us-east-1.amazonaws.com:9092 --topic mysql-db01.test.rohi --from-beginning{"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":0}
{"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":0}
{"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":0}
{"id":1,"fn":"rohit","ln":"ayare","phone":87611,"__ts_ms":0}

It’ll start copying the historical data and start capturing real-time CDC.

Setup S3 Sink connector in All Producer Nodes:

I want to send this data to the S3 bucket. Make sure the Debezium VM is attached with an IAM role that has S3 access to write. So you must have an EC2 IAM role which has access to the target S3 bucket. Or install awscli and configure access and secret key(but its not recommended)

Create s3.json file.

{
"name": "s3-sink-db01",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "searce-00000",
"name": "s3-sink-db01",
"tasks.max": "3",
"s3.region": "us-east-1",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"timezone": "UTC",
"locale": "en",
"flush.size": "10000",
"rotate.interval.ms": "3600000",
"topics.regex": "mysql-db01.(.*)",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"rotate.schedule.interval.ms": "3600000"
}
}

Register this S3 sink connector:

curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @s3.json

Check the Status:

curl GET localhost:8083/connectors/s3-sink-db01/status |jq

{
"name": "s3-sink-db01",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.44.151:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.44.151:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "172.31.44.151:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "172.31.44.151:8083"
}
],
"type": "sink"
}

Test the S3 sync:

Insert the 10000 rows into the rohi table. Then check the S3 bucket. It’ll save the data in JSON format with GZIP compression. Also in an HOUR wise partitions.

Monitoring the Debezium MySQL Connector:

Conclusion:

The MySQL and S3 config files are just referenced and we are using it. If you want more customization or you need any help in understanding the parameter, please refer to the Debezium documentation. Also, in this example blog, I'm doing S3 upload with a micro-batch(every 1hr or 10000 rows added/modified) If you want real-time then modify the config file accordingly.

If you want to do the same setup with Kafka in EC2 instead of AWS MSK please refer to the following link.

Searce Engineering

We identify better ways of doing things!

Bhuvanesh

Written by

Bhuvanesh

BigData | Cloud &Database Architect | blogger thedataguy.in

Searce Engineering

We identify better ways of doing things!

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade