Implementing Change Data Capture using GCP DataStream

Pooja Kelgaonkar
Google Cloud - Community
6 min readJan 25, 2022

Datastream is a serverless and easy-to-use change data capture (CDC) and replication service. It allows you to synchronize data across heterogeneous databases and applications reliably, and with minimal latency and downtime.

Datastream supports streaming from Oracle and MySQL databases into Cloud Storage. The service offers streamlined integration with Dataflow templates to power up-to-date materialized views in BigQuery for analytics, replicate your databases into Cloud SQL or Cloud Spanner for database synchronization, or leverage the event stream directly from Cloud Storage to realize event-driven architectures.

Benefits of Datastream include:

  • Being serverless so there are no resources to provision or manage, and the service scales up and down automatically, as needed, with minimal downtime.
  • Easy-to-use setup and monitoring experiences that achieve super-fast time-to-value.
  • Integration across the best of Google Cloud data services’ portfolio for data integration across Datastream, Dataflow, Cloud Data Fusion, Pub/Sub, BigQuery, and more.
  • Synchronizing and unifying data streams across heterogeneous databases and applications.
  • Security, with private connectivity options and the security you expect from Google Cloud.
  • Being accurate and reliable, with transparent status reporting and robust processing flexibility in the face of data and schema changes.
  • Supporting multiple use cases, including analytics, database replication, and synchronization for migrations and hybrid-cloud configurations, and for building event-driven architectures.
  • Lets consider a use case scenario where real time changes need to be copied from Cloud SQL (MySQL). Below are the steps to be followed to complete setup and pipeline execution.
  • Step1 : Setup GCP Cloud SQL (MySQL) instance
Create instance as shown above
  • Step 2: Enable Auto recovery of MySQL instance
  • a. Connect to the instance and create datastream user — use below command to connect from gcloud shell
  • gcloud sql connect instancename — user=root
  • b. Use create user using below command –

mysql> CREATE USER ‘datastream’@’%’ IDENTIFIED BY ‘[YOUR_PASSWORD]’;

mysql> GRANT REPLICATION SLAVE, SELECT, RELOAD, REPLICATION CLIENT, LOCK TABLES, EXECUTE ON *.* TO ‘datastream’@’%’;

mysql> FLUSH PRIVILEGES;

  • c. Verify configurations of mysql ->
  • Confirm that the binary log is configured correctly by entering the following MySQL command:
  • SHOW GLOBAL VARIABLES LIKE ‘%binlog_format%’;
  • Verify that the value for the binlog_format variable is set to ROW.
  • Confirm that the row format for the binary log is set to FULL by entering the following MySQL command:

mysql> SHOW GLOBAL VARIABLES LIKE ‘binlog_row_image’;

  • Verify that the slave updates option for the binary log is set to ON by entering the following MySQL command:

mysql> SHOW GLOBAL VARIABLES LIKE ‘log_slave_updates’;

  • Verify that the retention period of the binary log is set to 7 days by entering the following MySQL command:

mysql> SHOW GLOBAL VARIABLES LIKE ‘expire_logs_days’;

If log days are not set to 7 , use below steps to 7 days.

  1. Navigate to the /etc/mysql/ directory.
  2. Using an editor, open the my.cnf file.
  3. Add the following lines to the file:

d. [mysqld]

e. log-bin=mysql-bin

f. server-id=1

g. binlog_format=ROW

h. log-slave-updates=true

i. expire_logs_days=7

You’re entering the expire_logs_days=7 line of code because you want to ensure proper replication by configuring your system to retain binary logs for a minimum of 7 days.

  1. Save your changes to the my.cnf file, and then close the file.
  2. Restart your MySQL server so that the changes you made can take effect.

With this step, mysql configuration is complete. Later, we will add ips of data stream to allow traffic from mysql instance.

Step 3 : Creating connection profiles with data stream.

Connections needs to be created as source, sink/target to be configured to create streaming pipeline. Below are the connections to be created for this use case –

a. Mysql connection profile — mysql source connection

IP address needs to be whitelisted to mysql. Below are the ips –

a. Storage connection profile — GCS as sink/target

a. Add IP addresses to Mysql instance in add network, to allow traffic

Create stream pipeline

Once connection profiles are created, we can create stream and configure change data capture –

Configuring steps –

Continue to configure source -

Select connection profile and run test to check connectivity –

Test passed is required to configure connection –

Continue to configure stream for tables/objects/schemas –

Define sink/target destination –

Configure destination –

Create stream –

Once stream created, it wont run until this is started. This can be done using create and start as well. Once stream started , it will be shown like this –

GCS target –

Stream stored into GCS as below –

Once feed is getting captured in GCS, a follow up job can be created and run to load these changes to one of GCP DB service. One way of implementation is to create Dataflow job to capture changes from GCS and write to BQ , Spanner or Cloud SQL. Google has provided these jobs in templates which can be created and used quickly to build pipelines. This job will run like a real time job and write changes to target table/db.

I am DWBI and Cloud Architect! I have been working with various Legacy data warehouses, Bigdata Implementations, Cloud platforms/Migrations. I am Google Certified Professional Cloud Architect .You can reach out to me @ LinkedIn if you need any further help on certification, GCP Implementations!

--

--

Pooja Kelgaonkar
Google Cloud - Community

My words keep me going, Keep me motivating to reach out to more and more!