Implementing change data capture with Data Masking using STRIIM( SQLServer(CloudSQL) to BigQuery)

Neerajshivhare
Google Cloud - Community
12 min readFeb 24, 2023

In this blogpost, I’ll give a brief overview of Striim, go over how can we create and deploy a pipeline that continuously replicates changed data from a Google Cloud CloudSQL (SQL Server) database to a BigQuery table using Striim for Bigquery cloud GCP Marketplace product.

Image Source : Striim.com

Striim is a streaming Extract, Transform, and Load (ETL) data platform that enables real-time, zero downtime replication and transformation of data from a source database to one or more target database systems. It captures database operations like inserts, updates, and deletions from the transaction logs of the source database and replicates these changes to the target database using log-based change data capture (CDC) technology. Thanks to its user-friendly interface, users can quickly create their own pipelines to move and transform data with low or no code.

Prerequisite

  1. A Striim license. For free trials, go to https://go2.striim.com/free-trial. You can choose between Striim Platform and Striim Cloud.For more information about Striim partnership with Google, AWS, and Azure, please contact: partners@striim.com.
  2. An available Striim instance (Version 4.1.0.1 or higher) in a cloud/on-premise environment that has access to the source and target database.
  3. Access to GCP cloud console and privileges to deploy cloud SQL SQL Server instance.
  4. Create a GCP service account with the following permissions and create a JSON key from it: https://www.striim.com/docs/en/bigquery-setup.html

In this guide, you will:

  1. Deploy Cloud SQL MS SqlServer database instance.
  2. Enable Change Data Capture (CDC) in your Cloud SQL SQL Server database.
  3. Deploy Striim for BigQuery service from GCP Marketplace
  4. Create and run a Striim Replication job.
  5. View the results in BigQuery.

Step 1 : Create a CloudSQL SqlServer instance

In this quick start, you use the Google Cloud console. To use the Google Cloud CLI, cURL, or PowerShell, see Create instances.

  1. In the Google Cloud console, go to the Cloud SQL Instances page.
    Go to Cloud SQL Instances
  2. Click Create Instance.
  3. Click Choose SQL Server.
  4. Enter myinstance or a unique name for Instance ID.
  5. Enter a password for the sqlserver user.
  6. Click Create.
    You’re returned to the instances list. You can click the new instance right away to see the details, but it won’t be available for other operations until it initializes and starts.

Note: In this example, the instance is created using default settings, including a public IP address.

Step 2 : Enable CDC in your SQL Server database

This section describes how to enable change data capture (CDC) in Cloud SQL for SQL Server. This feature is available for the databases of your instances.

CDC enables you to capture many types of changes.

2.1 Enabling CDC and starting CDC capture jobs

Your database has the following stored procedures, for use by the sqlserver user:

msdb.dbo.gcloudsql_cdc_enable_db
msdb.dbo.gcloudsql_cdc_disable_db

2.1.1 Turn CDC on

To turn this feature on for a database, execute the necessary stored procedure and pass in the database name. For example:

EXEC msdb.dbo.gcloudsql_cdc_enable_db ‘[DATABASE_NAME]’

2.1.2 Turn CDC off

To turn this feature off for a database, run a command such as the following:

EXEC msdb.dbo.gcloudsql_cdc_disable_db '[DATABASE_NAME]'

2.1.3 Start CDC capture jobs

After CDC is enabled, jobs are created for capture and cleanup. The jobs are invisible to the sqlserver user in SQL Server Management Studio (SSMS). However, you can modify the jobs using built-in stored procedures. Additionally, the jobs are viewable via the following stored procedure:

Sys.sp_cdc_help_jobs

2.1.4 Enabling CDC for a table

After you turn on CDC for a database, any user with dbo (database owner) access can set up tracking for tables in the database.

For information about the standard CDC commands and options, see Enable and Disable Change Data Capture.

2.1.5 Track changes in a table

To track a table, use the sys.sp_cdc_enable_table stored procedure.

For example, you could specify a command similar to the following:

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'customer',
@role_name = N'CDC'

2.1.6 Check if CDC is enabled on a table

To check if CDC is enabled on a table, use the sys.sp_cdc_help_change_data_capture stored procedure.

For example, you could specify a command similar to the following:

EXECUTE sys.sp_cdc_help_change_data_capture
@source_schema = N'dbo',
@source_name = N'customer'

Query changes via a CDC change table

To view CDC changes made on a table, use a SELECT query on the table that automatically is created when CDC is enabled on that table.

The table is named as follows:

<schema>_<table_name>_CT

For example, you could specify a command similar to the following:

SELECT * FROM cdc.dbo_customer_CT

For more detailed information on Cloud Sql CDC please refer — https://cloud.google.com/sql/docs/sqlserver/replication/enable-cdc

Step 3 : Deploy a Striim for BigQuery Service

There are two marketplace products available for Striim in GCP which can be used for CDC implementation for CLOUD SQL source to Bigquery target :

  • Striim cloud
  • Striim for BigQuery

3.1 Create a new Striim for BigQuery service

3.2 Whitelist Striim service IP address

To connect Cloud SQL MSSQL from Striim service we need to whitelist the IP address of Striim service by adding the same as whitelisted IP in Cloud SQL network configuration.

Step a : Copy the IP address of Striim service.

Step b : Whitelist the IP by adding it in the authorized network list in Cloud SQL.

3.3 Launch the service :

Striim Cloud Service : From the Striim console click on the service created in step 3.1 and launch the service.

3.4 Create New pipeline :

From the pipeline console click on New Pipeline option and follow the steps mentioned below to create a new pipeline.

3.4.1 Connect To Target :

Connect to the target , In this case it will be BigQuery. Get the service account key for BigQuery from GCP console and use the same in the target connection console to create a BigQuery connection.

3.4.2 Connect to Source :

Select the appropriate source database, In this case we are going to select SQL Server.

Add a new connection :

Enter Cloud SQL for SQL Server Connection Details :

Select the Source Schema to Integrate :

Select tables

Select the source tables you want Striim to sync to BigQuery, optionally mask fields or select key columns, then click Next. Striim will create the tables automatically the first time you run the pipeline.

Mask data using Striim (optional)

Optionally, you may mask data from source columns of string data types so that in the target their values are replaced by xxxxxxxxxxxxxxx. The Transform Data drop-down menu will appear for columns for which this option is available. (This option is not available for key columns.)

To mask a column’s values, set Transform Data to Mask.

Masked data will appear as xxxxxxxxxxxxxxx in the target:

Select key columns (optional)

The following is applicable only when you select Write continuous changes directly (MERGE mode) in Additional Settings. With the default setting Write continuous changes as audit records (APPEND ONLY mode), key columns are not required or used.

By default, when a source table does not have a primary key, Striim will concatenate the values of all columns to create a unique identifier key for each row to identify it for UPDATE and DELETE operations. Alternatively, you may manually specify one or more columns to be used to create this key. Be sure that the selected column(s) will serve as a unique identifier; if two rows have the same key that may produce invalid results or errors.

When target tables already exist

Select what you want Striim to do when some of the tables selected to be synced already exist in BigQuery:

  • Proceed without the existing tables: Omit both source and target tables from the pipeline. Do not write any data from the source table to the target.
  • Add prefix and create new tables: Do not write to the existing target table. Instead, create a target table of the same name, but with a prefix added to distinguish it from the existing table.
  • Drop and re-create the existing tables: Drop the existing target tables and any data they contain, create new target tables, and perform initial sync with the source tables. Choose this option if you were unsatisfied with an initial sync and are starting over.
  • Use the existing tables: Retain the target table and its data, and add additional data from the source.

Review the impact of the action to be taken. To proceed enter yes and click Confirm and continue.

Create table groups

This is optional. Click Skip for now if you do not wish to create table groups at this time.

Striim uses table groups to parallelize writing to BigQuery to increase throughput, with each table group mapped internally to a separate BigQuery writer. The batch policy for each table group is the minimum feasible LEE (end-to-end latency) for tables in the group. We recommend the following considerations when you create your table groups:

  • Place your sensitive tables into individual table groups. These tables may have high input change rates or low latency expectations. You can group tables with a few other tables that exhibit similar behavior or latency expectations.
  • Place all tables that do not have a critical dependency on latency into the Default table group. By default, Striim places all new tables in a pipeline into the Default table group.

Additional Settings

How do you want to write changes to BigQuery?

  • Write continuous changes as audit records (default; also known as APPEND ONLY mode): BigQuery retains a record of every operation in the source. For example, if you insert a row, then update it, then delete it, BigQuery will have three records, one for each operation in the source (INSERT, UPDATE, and DELETE). This is appropriate when you want to be able to see the state of the data at various points in the past, for example, to compare activity for the current month with activity for the same month last year.
    With this setting, Striim will add two additional columns to each table, STRIIM_OPTIME, a timestamp for the operation, and STRIIM_OPTYPE, the event type, INSERT, UPDATE, or DELETE. Note: on initial sync with SQL Server, all STRIIM_OPTYPE values are SELECT.
  • Write continuous changes directly (also known as MERGE mode): BigQuery’s tables are synchronized with the source tables. For example, if you insert a row, then update it, BigQuery will have only the updated data. If you then delete the row from the source table, BigQuery will no longer have any record of that row.

How would you like to handle schema changes?

Select what you want Striim to do in BigQuery when a table or column is added to or a table is dropped from the source database:

  • Do not propagate changes and continue (default): Striim will take no action. Any data added to new tables will not be synced to BigQuery. Any data added to a new column will not be synced to BigQuery as the column will not exist in the target. Tables dropped from the source will continue to exist in BigQuery.
  • Pause the pipeline: Striim will pause the pipeline. After making any necessary changes in the source or BigQuery, restart the pipeline.
  • Propagate changes to BigQuery: In BigQuery, Striim will create a new table, add a column, or drop a table so that the target matches the source. Sync will continue without interruption. (Note that if a column is dropped from a source table, it will not be dropped from the corresponding BigQuery target table.)

Would you like to use the streaming mode to write continuous changes to BigQuery?

Use streaming mode(enabled by default): We recommend using this method when you need low latency. If your uploads are infrequent (for example, once an hour, you may wish to disable streaming mode. If BigQuery is running in a free trial or the Google Cloud Platform free tier, disable this option or upgrade to a paid account.

What is your PostgreSQL replication slot?

Enter the name of the slot you created or chose in Set up your PostgreSQL source. Note that you cannot use the same slot in two pipelines, each must have its own slot.

Schema evolution tracker table

If for “How would you like to handle schema changes” you selected Propagate changes to BigQuery, enter the name of the table you created or chose in Set up your PostgreSQL source.

3.5 Start the pipeline

From the Replication job details page:

Click Start.

The Replication job transitions from Provisioning to Starting to Running state. In the running state, the Replication job loads an initial snapshot of the table data that you selected (for example, the People table) into BigQuery. In this state, the state of the People table is listed as Snapshotting. After the initial snapshot is loaded into BigQuery, any changes made to the People table are replicated to BigQuery, and the state of the table is listed as Replicating.

3.6 Monitor the pipeline

You can start and stop the Replication job, review its configuration and logs, and monitor your Replication job.

You can monitor Replication job activities from the Replication job details page.

  1. From the Replication page, click the desired Replication job Name.
  2. Click Monitoring.

Replication job is running with no changed records.

Verify the record count in the SqlServer table.

Insert 3 records into the sqlserver table and monitor the Data fusion Monitoring console for changes records..

Delete a record and Monitor the DataFusion console for the deleted record to flow to BigQuery.

Verify the record count in SqlServer source after the operations.

3.7 View the results in BigQuery

The Replication job creates a replicated dataset and table in BigQuery, with names inherited from the corresponding SQL Server database and table names.

If your source database name does not comply with the BigQuery dataset naming conventions, or if you have naming conflicts when you replicate multiple datasets into a single BigQuery dataset, specify aliases in the BigQuery target properties.

  1. Open BigQuery in the Google Cloud console.
  2. In the left panel, click the project name to expand a list of datasets.
  3. Select the pocdb dataset, and then select a table to view.

Verify the record count in BigQuery before CDC operation.

Verify the record count again after insert and delete operation for the changed records.

References

--

--