A Batch Driven CDC (Change Data Capture) Approach using Google Cloud Platform

Carlos Augusto
Google Cloud - Community
10 min readJan 14, 2021

In general

Most databases nowadays have native capabilities or can work with partner solutions to allow identification of data changes in real-time. This allows someone to create Change Data Capture (CDC) streams and constantly feed cloud data-warehousing systems such as BigQuery, enabling real-time and predictive analytics. Enabling CDC functionality generally requires additional configuration on the source but provides an efficient way to identifying and cascading data changes to downstream systems and applications.

If there is no built-in real-time CDC solution available for the source system in question, identifying what “rows changed” can generally be achieved by executing incremental batch jobs, reading records from the source tables based on a timestamp or some kind of “incremental id” column. For example, an incremental batch job may execute every hour and select all records where the last_updated column is greater than “last_job_run_timestamp”. The “incremental id” column can be populated by an application code or via database triggers. It’s not uncommon to see this method with traditional ETL-based DW solutions but the freshness of the data on downstream solutions or performance concerns on the source when using a trigger-based approach generally pushes organizations to look at real-time CDC solutions.

There are outliers

There are cases, however, where neither real-time CDC nor incremental batch change identification is possible. Sometimes the source system can’t be modified for whatever reason: security, risk, criticality, lack of knowledge, etc. Changes, however, still need to be identified and loaded into the data warehouse target system.

This particular artifact attempts to document an approach and solution for this particular case, where you can’t implement a de-facto CDC solution and look for an alternative approach. I have to say this approach may not work for very large databases, big tables or where the network connectivity is slow, but in some cases, it may be applicable.

Based on a real use case

Here is a real use case I encountered working with a customer recently: Imagine a critical OLTP database (DB2 on a very old AS/400) where no changes were allowed at all. Solutions like IBM replicator to MQ, for example, were not enabled nor could be enabled in the near future as everyone was afraid of touching the existing system. In addition, the tables didn’t have a “last updated timestamp column” to perform an incremental “change data capture” logic. No trigger based solution could be implemented either as any changes would require full regression testing and introduce risk to this complex environment.

The challenge was: How could we still identify what changed from DB2 overtime and merge the changed rows with the final dataset table in BigQuery?

A custom solution was put together using Google Cloud Platform to achieve this specific business goal. The solution used an ELT (extract-Load-Transform) approach using the following Google Cloud services:

  • Cloud Composer as the workflow, orchestration and scheduling tool
  • Dataflow for the extract/load into BigQuery
  • BigQuery for the CDC identification and additional transformations

Extract-Load-Transform Solution Architecture:

Solution main steps:

1 = A scheduled Cloud Composer DAG was deployed to manage the entire workflow, starting with a quick “truncate BigQuery staging table command”, followed by a Dataflow load job initialization and execution.

2 = Dataflow used the standard JDBCtoBigQuery template available to perform a “select * from the DB2 table”, while inserting the results of the select into the BigQuery staging table truncated earlier. The Dataflow step mentioned here required no actual code development. It was just a matter of passing the correct parameters to an available and standard Dataflow template.

3 = Orchestrated by Cloud Composer, a few SQL-based steps were executed inside BigQuery, using BigQuery’s power and unlimited scalability:

  • BigQuery compared the contents of the staging table (recently loaded) with another table (the final table) and identified the differences
  • BigQuery logged the differences into a log table, creating a trail of all the changes overtime, and allowing one to exactly pinpoint the value of a particular row at a specific date/time
  • Finally, BigQuery, thru ANSI-SQL commands, merged the identified changes with the contents of the final table

4 = Cloud Composer monitored the entire workflow execution and sent an email upon execution of all workflow steps

5 = The customer’s BI tool was then able to connect to BigQuery and observe the latest version of the data on business dashboards

Want to practice this solution yourself?

Considerations to get you started faster:

  • For the source server: You may want to consider using a MySQL or Postgres instance as your source server, mimicking a DB2 server for this exercise. The entire solution uses standard SQL over JDBC, so as long as there is a JDBC driver available to your database platform you will be fine. Google has fully managed MySQL/Postgres/SQLServer options via Cloud SQL. You can also decide to install, for example, a MySQL instance directly on top of a Google Compute Engine, leveraging one of the pre-built images from the Google Cloud marketplace. No need to install MySQL manually if using the marketplace image. Just install the preconfigured image.
  • A proxy Server: If changing firewall rules to allow communication between Dataflow and your source database is a challenge, you may consider a reverse-proxy solution for this step since this is a non-production exercise. Using nginx, for example, your firewall configuration between Dataflow and your source DB may become quite simpler. A firewall rule change will still be required, but since all communication between Google Cloud services and the source DB will go thru the reverse proxy, you will only need to allow communication between the proxy server IP and your source. I want to emphasize that, since this is a proof of concept environment, scalability, performance, HA and other concerns are not applicable. For Production, it is recommended to deploy CloudVPN or a dedicated Interconnect pipe between Google Cloud and your source system, eliminating the reverse proxy component all together.

Ready for the hands-on? Here are the main steps:

1. Deploy a MySQL compute image using Google Cloud Marketplace and prepare it

vi /opt/bitnami/mysql/conf/my.cnf

#Replace xxx with your IP Address bind-address = xxx.xxx.xxx.xxx or use 0.0.0.0 for all IPS

Restart mysql

  • Create a test database, user and a new table in mysql

mysql -u root -h <your_db_ip> -p

create database testdb;

CREATE USER ‘user1’@’%’ IDENTIFIED BY ‘<your_password>’;

GRANT ALL ON *.* TO ‘user1’@’%’;

flush privileges;

create table testdb.table1

(

id INT,

col1 VARCHAR(10),

col2 VARCHAR(10),

col3 VARCHAR(10),

col4 VARCHAR(10)

);

  • Insert some test data on the newly created table

INSERT INTO testdb.table1 VALUES (1,’col1–1',’col2–1',’col3–1',’col4–1');

INSERT INTO testdb.table1 VALUES (2,’col1–2',’col2–2',’col3–2',’col4–2');

INSERT INTO testdb.table1 VALUES (3,’col1–3',’col2–3',’col3–3',’col4–3');

INSERT INTO testdb.table1 VALUES (4,’col1–4',’col2–4',’col3–4',’col4–4');

INSERT INTO testdb.table1 VALUES (5,’col1–5',’col2–5',’col3–5',’col4–5');

commit;

2. Configure a NGINX Reverse Proxy using GCE and Marketplace (not needed once a CloudVPN and/or Interconnect is setup)

sudo su -

cp /etc/nginx/nginx.conf /etc/nginx/nginx.conf.orig

vi /etc/nginx/nginx.conf

— Replace contents of the entire file (/etc/nginx/nginx.conf) with the contents below, changing 10.150.0.2:3306 with whatever IP:PORT combination the MySQL database is running at

########

user www-data;

worker_processes auto;

pid /run/nginx.pid;

include /etc/nginx/modules-enabled/*.conf;

events {

worker_connections 1024;

}

stream {

upstream sqlvm {

server 10.150.0.2:3306;

}

### Proxy would listen on 1433 in this case

server {

listen 1433;

proxy_pass sqlvm;

}

}

############

Restart nginx and check it’s processes

systemctl stop nginx

systemctl start nginx

systemctl status nginx

3. Test the connectivity to MySQL, using the nginx proxy you just configured

Example below:

— Adjust nginx IP and port accordingly

mysql -u user1 -h 10.128.0.5 -P 1433 -p

mysql> select * from testdb.table1;

+ — — — + — — — — + — — — — + — — — — + — — — — +

| id | col1 | col2 | col3 | col4 |

+ — — — + — — — — + — — — — + — — — — + — — — — +

| 1 | col1–1 | col2–1 | col3–1 | col4–1 |

| 2 | col1–2 | col2–2 | col3–2 | col4–2 |

| 3 | col1–3 | col2–3 | col3–3 | col4–3 |

| 4 | col1–4 | col2–4 | col3–4 | col4–4 |

| 5 | col1–5 | col2–5 | col3–5 | col4–5 |

+ — — — + — — — — + — — — — + — — — — + — — — — +

5 rows in set (0.00 sec)

4. Create a BigQuery Dataset and Tables

  • Create a BigQuery dataset (Google_CarlosAugusto used in this example) via the Google Cloud console or gcloud equivalent commands
  • Create the BigQuery tables to handle the custom CDC approach

Description:

Table1 = End/final table (where CDC data will be merged into)

Table1_log = Will save/capture all changes to table1 overtime … Think of it as an audit table, saving all row changes

Table1_staging = Staging table where raw data from the source will be loaded into using Dataflow

CREATE OR REPLACE TABLE

`Google_CarlosAugusto.table1`

(

id INT64,

col1 STRING,

col2 STRING,

col3 STRING,

col4 STRING

)

;

CREATE OR REPLACE TABLE

`Google_CarlosAugusto.table1_log`

(

op STRING,

id INT64,

col1 STRING,

col2 STRING,

col3 STRING,

col4 STRING,

changetime DATETIME

)

;

CREATE OR REPLACE TABLE

`Google_CarlosAugusto.table1_staging`

(

id INT64,

col1 STRING,

col2 STRING,

col3 STRING,

col4 STRING

)

;

5. Create GCS buckets, including one to store the appropriate source jdbc driver (to be used by Dataflow)

gsutil mb gs://drivers-bucket

gsutil mb gs://composer_output_bucket

gsutil cp /dev/null gs://drivers-bucket/tmp_dir/log

gsutil cp /dev/null gs://composer_output_bucket/tmp/log

gsutil cp mysql-connector-java-5.1.49-bin.jar gs://drivers-bucket

6. Create a Cloud Composer instance using the cloud Console

https://cloud.google.com/composer/docs/how-to/managing/creating

7. Download the DAG (directed acyclic graph) below, adjusting it accordingly. Then deploy it to Composer and execute it

  • Download this DAG python example file to your workstation as you will need to modify some things according to your setup
  • Define the following environment variables in Composer/Airflow. Use appropriate values based on your setup

gcp_project

gcs_bucket

gce_zone

gce_region

email

dataset_id

  • Modify the downloaded DAG file, replacing the “<CHANGEME>” references to values related to your setup

ps: If dataset or table names/definitions were changed then additional DAG lines will need to be modified besides the ones marked with the term “<CHANGEME>

  • Upload the modified DAG file to Cloud Composer’s DAG folder
  • Execute the DAG by clicking the “Trigger DAG” button as shown below

ps: This DAG will perform the entire load (using Dataflow), CDC identification for table1 using BQ and then merge the BQ table1.

ps: Notice the loadbq_table1_dataflow step (in blue) will create/kick-off a Dataflow job. Feel free to check the GCP console for the Dataflow job execution details once it’s running.

ps: This step is equivalent to the initial load. Contents of BigQuery’s Google_CarlosAugusto.table1 should match the contents of the source table after a successful execution.

ps: Although we are manually executing the DAG on this example, you can easily schedule the execution of this workflow by adjusting the DAG (.py) file shared previously.

8. Assuming the Cloud Composer job completes successfully, check table1_log and table1 tables in BigQuery to validate the load:

— Check what changed (CDC) since last job run

select * from Google_CarlosAugusto.table1_log where changetime in (select max(changetime) from Google_CarlosAugusto.table1_log) order by id;

Example:

— Check final table (table1) and compare content against the mysql source. Source and target should be identical at this point

select * from Google_CarlosAugusto.table1 order by id;

9. Back on the source, perform some data changes

DELETE FROM testdb.table1 WHERE id=1;

UPDATE testdb.table1 SET col2=’2–2' WHERE id=2;

UPDATE testdb.table1 SET col3=’3–2' WHERE id=2;

UPDATE testdb.table1 SET col4=’4–3' WHERE id=3;

INSERT INTO testdb.table1 VALUES (6,’6',’1000',’c’,’d’);

commit;

mysql> select * from testdb.table1;

+ — — — + — — — — + — — — — + — — — — + — — — — +

| id | col1 | col2 | col3 | col4 |

+ — — — + — — — — + — — — — + — — — — + — — — — +

| 2 | col1–2 | 2–2 | 3–2 | col4–2 |

| 3 | col1–3 | col2–3 | col3–3 | 4–3 |

| 4 | col1–4 | col2–4 | col3–4 | col4–4 |

| 5 | col1–5 | col2–5 | col3–5 | col4–5 |

| 6 | 6 | 1000 | c | d |

+ — — — + — — — — + — — — — + — — — — + — — — — +

5 rows in set (0.05 sec)

Notice row 1 is gone

10. Rerun the Cloud Composer job (covered on step 7) and check the tables in BigQuery (step 8)

Assuming all is working well, the contents of BigQuery’s Google_CarlosAugusto.table1 should match the contents of the source table (testdb.table1). Google_CarlosAugusto.table1_log should also reflect exactly what rows changed (row inserted, updated or deleted).

ps: Depending on the DML changes performed on step 9, values may differ from the picture above. Regardless, source and destination tables should be in sync after Cloud Composer finishes it’s execution.

In Summary

It’s no uncommon to see real-time Change Data Capture (CDC) solutions when moving data to analytic warehouses. There will be times, however, when the identification of changes will need to occur somewhere else, other than the source environment. The solution above will provide you this capability, leveraging the unlimited power and scalability of BigQuery and ANSI SQL to identify what rows changed and keep the source and target tables in sync. Thanks for reading this article! See you around!

Disclaimer

I am a Data Specialist at Google, Inc. (Google Cloud). The opinions stated here are my own, not those of Google, Inc.

--

--