Stream Data From Yugabyte CDC to AWS MSK using Debezium
Change Data Capture is a mechanism to track changes made in a database. Yugabyte Database has recently added CDC feature to their latest release 2.13. In this article, we’ll learn to configure Yugabyte CDC and stream data into AWS MSK using Debezium connector.
It’s assumed that the readers have an elementary knowledge of AWS, Apache Kafka , and CDC.
Let’s start now with the setup
- Configuration of IAM Roles and Policies
Create a new role with the required accesses to AWS services. For demo, we’ll name it as “yb_cdc_kafka_role”. The Trusted entities should be configured as below.
The IAM roles and Policies defined below are generic and can be fine-tuned based on your organization’s IT policies
{“Version”: “2012–10–17”,“Statement”: [{“Effect”: “Allow”,“Principal”: {“Service”: “kafkaconnect.amazonaws.com”},“Action”: “sts:AssumeRole”}]}
Create a policy with access to the following AWS services.
- Apache Kafka APIs for MSK
- EC2
- MSK Connect
- S3
- CloudWatch
{“Version”: “2012–10–17”,“Statement”: [{“Sid”: “VisualEditor0”,“Effect”: “Allow”,“Action”: “ec2:CreateNetworkInterface”,“Resource”: “arn:aws:ec2:*:*:network-interface/*”,“Condition”: {“StringEquals”: {“aws:RequestTag/AmazonMSKConnectManaged”: “true”},“ForAllValues:StringEquals”: {“aws:TagKeys”: “AmazonMSKConnectManaged”}}},{“Sid”: “VisualEditor1”,“Effect”: “Allow”,“Action”: “ec2:CreateTags”,“Resource”: “arn:aws:ec2:*:*:network-interface/*”,“Condition”: {“StringEquals”: {“ec2:CreateAction”: “CreateNetworkInterface”}}},{“Sid”: “VisualEditor2”,“Effect”: “Allow”,“Action”: [“ec2:DetachNetworkInterface”,“ec2:CreateNetworkInterfacePermission”,“ec2:DeleteNetworkInterface”,“ec2:AttachNetworkInterface”],“Resource”: “arn:aws:ec2:*:*:network-interface/*”,“Condition”: {“StringEquals”: {“ec2:ResourceTag/AmazonMSKConnectManaged”: “true”}}},{“Sid”: “VisualEditor3”,“Effect”: “Allow”,“Action”: “ec2:CreateNetworkInterface”,“Resource”: [“arn:aws:ec2:*:*:subnet/*”,“arn:aws:ec2:*:*:security-group/*”]},{“Sid”: “VisualEditor4”,“Effect”: “Allow”,“Action”: [“cloudwatch:PutDashboard”,“cloudwatch:PutMetricData”,“cloudwatch:DeleteAlarms”,“kafkaconnect:ListConnectors”,“cloudwatch:DeleteInsightRules”,“cloudwatch:StartMetricStreams”,“cloudwatch:DescribeAlarmsForMetric”,“cloudwatch:ListDashboards”,“cloudwatch:ListTagsForResource”,“kafka-cluster:AlterCluster”,“kafkaconnect:CreateWorkerConfiguration”,“cloudwatch:PutAnomalyDetector”,“kafka-cluster:Connect”,“kafkaconnect:UpdateConnector”,“cloudwatch:DescribeInsightRules”,“cloudwatch:GetDashboard”,“cloudwatch:GetInsightRuleReport”,“kafka-cluster:ReadData”,“cloudwatch:DisableInsightRules”,“cloudwatch:GetMetricStatistics”,“cloudwatch:DescribeAlarms”,“cloudwatch:GetMetricStream”,“kafka-cluster:*Topic*”,“kafkaconnect:DescribeConnector”,“cloudwatch:GetMetricData”,“cloudwatch:ListMetrics”,“cloudwatch:DeleteAnomalyDetector”,“kafkaconnect:ListWorkerConfigurations”,“cloudwatch:DescribeAnomalyDetectors”,“cloudwatch:DeleteDashboards”,“kafka-cluster:AlterGroup”,“cloudwatch:DescribeAlarmHistory”,“cloudwatch:StopMetricStreams”,“cloudwatch:DisableAlarmActions”,“kafkaconnect:DescribeWorkerConfiguration”,“kafkaconnect:CreateConnector”,“kafkaconnect:ListCustomPlugins”,“cloudwatch:DeleteMetricStream”,“cloudwatch:SetAlarmState”,“kafka-cluster:DescribeGroup”,“cloudwatch:GetMetricWidgetImage”,“kafkaconnect:DescribeCustomPlugin”,“s3:*”,“kafka-cluster:DescribeCluster”,“cloudwatch:EnableInsightRules”,“cloudwatch:PutCompositeAlarm”,“cloudwatch:PutMetricStream”,“cloudwatch:PutInsightRule”,“cloudwatch:PutMetricAlarm”,“cloudwatch:EnableAlarmActions”,“cloudwatch:ListMetricStreams”,“kafkaconnect:CreateCustomPlugin”,“kafkaconnect:DeleteConnector”,“kafkaconnect:DeleteCustomPlugin”,“kafka-cluster:WriteData”],“Resource”: “*”},{“Sid”: “VisualEditor5”,“Effect”: “Allow”,“Action”: “ec2:DescribeNetworkInterfaces”,“Resource”: “arn:aws:ec2:*:*:network-interface/*”,“Condition”: {“StringEquals”: {“ec2:ResourceTag/AmazonMSKConnectManaged”: “true”}}}]}
2. Enable CDC on Yugabyte Database
Ensure that your Yugabyte Database is up and running . To install yugabyte on your cloud virtual machine, please refer to https://docs.yugabyte.com/preview/quick-start/install/macos/.
Create a test table on Yugabyte database within Public schema.
CREATE TABLE test (id INT PRIMARY KEY, name TEXT);
Enable CDC through yb-admin .Below command will enable CDC on all the schemas and tables sitting under the Yugabyte database.
./yb-admin — master_addresses <master_addresses>:7100 create_change_data_stream ysql.yugabyte
If you have a multi-node yugabyte setup, then you need to provide a Comma-separated list of host:port values of both the leader and the follower nodes as master_address argument.
A successful operation of the above command returns a message with a DB stream ID:
CDC Stream ID: 90fe97d59a504bb6acbfd6a940
For more details on CDC commands, please refer to https://docs.yugabyte.com/preview/admin/yb-admin/#change-data-capture-cdc-commands
3. Configuration of AWS Security Group
Create a Security Group with inbound and outbound rules configured to ensure access to MSK cluster and Yugabyte DB . For demo, we’ll enable incoming traffic from all the ports.
4. Upload Debezium connector Jar file onto S3 bucket
Download Yugabyte Debezium connector jar from https://github.com/yugabyte/debezium/releases/download/v1.7.0-beta/debezium-connector-yugabytedb2-1.7.0-SNAPSHOT-jar-with-dependencies.jar and upload it onto an S3 bucket.
5. Configuration AWS MSK cluster
In this example, we’re creating AWS MSK cluster under same VPC as that of Yugabyte Cluster . Please note that this is a generic configuration , it might differ based your organizational IT policy.
For demo, we have created cluster with two zones only.
Under Networking Section, select VPC and Private subnets same as that of Yugabyte Cluster . Choose the security group created in step 3 from the drop down list.
Enable logging on your cluster to ease debugging . In this demo, we are using S3 bucket to store the logs.
The cluster is now is now configured successfully.
6. Configuration of AWS MSK connector
Now that your AWS MSK cluster is ready , it’s time to create a connector to stream data from Yugabyte database onto MSK cluster.
First, create a plugin to access the jar stored in S3 bucket
Select the MSK cluster created in the earlier step.
Configure the connector as per the below details. In this step we are providing CDC stream ID and Database details.
connector.class=io.debezium.connector.yugabytedb.YugabyteDBConnectordatabase.streamid=90fe97d59a504bb6acbfd6adatabase.user=yugabytedatabase.dbname=yugabytetasks.max=2database.server.name=dbserverdatabase.port=5433database.master.addresses=<master_address>:7100database.hostname=<hostname>database.password=<db password>table.include.list=public.testsnapshot.mode=never
Select the role “yb_cdc_kafka_role” created earlier .
Your connector is now configured.
To read the messages through the configured MSK cluster, you can setup Apache Kafka on an EC2 client machine and consume messages generated through test topic.
./kafka-console-consumer.sh — topic dbserver.public.test — bootstrap-server <bootstrap_server1>:9092,<bootstrap_server2>:9092
Features and Limitations:
- Currently, the CDC feature is available for YSQL tables only.
- It’s advised not to drop or truncate tables with CDC enabled. It might crash or disrupt the functioning of the database.
- All the tables within the database should have Primary Key defined. Even the ones that are not listed in MSK connector configuration.
- Users need to re-create CDC stream id to include a new table to its CDC watch list.
References :
1.https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html
2. https://docs.yugabyte.com/preview/explore/change-data-capture/debezium-connector-yugabytedb/