Replicating changes from SQL Managed Instance to the data lake using Change Data Capture and Azure Data Factory

Nicholas Hurt
11 min readMar 29, 2020

--

Introduction

As more organisations gravitate toward building a data lake, one of the common challenges they will need to solve is how to hydrate the lake with operational data on a continuous basis. Typically this data originates from a relational database and customers will often want to load the lake with a continuous feed of incremental changes, particularly for downstream analytics. Whilst there are some good 3rd party options for replication, such as Attunity and Strim, there exists an inconspicuous option using change data capture (CDC) and Azure Data Factory (ADF).

For those running operational databases using SQL Server on-premises, in a VM (IaaS) or SQL Managed Instance (SQL MI), this blog will demonstrate how to use CDC and ADF to publish incremental changes to the data lake.

A brief overview of change capture options…

Capturing changes and replicating these to downstream systems has been, and still is a common requirement, particularly in data warehousing scenarios. Apart from home-grown solutions using timestamps, watermarks and triggers (which are often high maintenance and inefficient), for those using SQL Server or SQL MI there are two built-in methods of capturing changes. The first is change tracking (CT), which is a lightweight solution which synchronously captures changes to rows using primary keys. The other, more robust solution, is change data capture (CDC) which uses the transaction logs and asynchronously captures changes to a set of change tables. Change data capture functionality has been part of the feature set since SQL Server 2008. The features and considerations of each are discussed succinctly in the documentation and in this seminal Technet article, but despite the obvious appeal of CDC, organisations will need to evaluate which option best suits their situation. A final consideration for those only using only PaaS SQL databases, is that CT is supported on all the Azure SQL DB options whilst CDC is only supported on Managed Instance.

Incremental data loading using ADF

According to the documentation ADF supports two methods of incremental loading from a source database — using a watermark (such as a timestamp field or key column) or change tracking. There is no mention of support for change data capture and there are no articles or blogs that can be easily found either. The good news is that it is indeed possible from a technical perspective using the copy activity, but first let us examine two important aspects of any replication solution — latency and cost.

Considering the comparison of trigger types in ADF, the simplest option for copying incremental changes would be to use a tumbling window trigger. This provides access to the window start and end variables, backfill support and retry capability. This trigger type has a minimum window interval of 5 minutes, meaning there is a potential latency of 5 minutes before the data will arrive in the lake.

Running any pipeline on a frequent basis in Azure Data Factory can incur a large bill if inefficiently designed, particularly when using the copy activity. Out of all the integration runtime costs, data movement incurs the largest cost per hour. Another important point to note is that all integration run time charges are prorated by the minute and rounded up. Considering this, if you had a copy activity which was running frequently but not copying any data, it would likely complete in under a minute, yet one would be charged for the full minute. Current pricing for most regions is $0.25/DIU-hour for the copy data activity and $0.005/hour for Lookup activities. Running a tumbling window every 5 minutes at the minimum DIUs (which is 2), this would be

0.25 x (1/60) x (60/5) x 2 x 24 x 30= $72 per table per month

If there were 100 tables that would be a minimum of $7200 per month regardless of whether any data was copied or not!

Therefore to make the pipeline as cost effective as possible, one could perform a Lookup operation prior to running the copy operation to see if any changed records exist. Adding the lookup activity would incur negligible additional cost:

0.005 x (1/60) x (60/5) x 24 x 30 = $0.72 pm per table per month

For further information on pricing please see Catherine Wilhelmsen’s Lessons Learned: Understanding Azure Data Factory Pricing.

Prerequisites

If you wish to create this demo in your subscription, you will need:

OR if running SQL Server on-prem, in a VM or VNet

AND

CDC Configuration

Connect to your database, create a new database and a sample customer table with CDC enabled:

EXEC sys.sp_cdc_enable_db create table customers (customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'customers',
@role_name = 'null',
@supports_net_changes = 1

With the table created and change tracking enabled, add some data:

insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading');
insert into customers (customer_id, first_name, last_name, email, city) values (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth');
insert into customers (customer_id, first_name, last_name, email, city) values (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');

Verify that a count of the changed records can be determined using the fn_cdc_get_net_changes_dbo_customers function:

DECLARE  @from_lsn binary(10), @to_lsn binary(10);  
SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());
SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')

Note the use of from and to LSNs (Log Sequence Number) to represent the window of time in which to look for changes. This code has been adapted from the documentation and will determine if there are any changes to the customers table since CDC was enabled.

ADF Configuration

Once you have created a Data Factory instance, configure a new linked service to the database.

For those using SQL MI see here for information regarding access via public vs private endpoint. If using private endpoint one would need to run this pipeline using a self-hosted integration runtime. The same would apply to those running SQL Server on-prem, in a VM or VNet scenarios.

Create a new dataset pointing to the cdc.dbo_customers_CT table which is automatically generated once CDC is enabled on the customers table. Changed data is never queried from this table directly but is instead extracted through the CDC functions.

Next create a new pipeline, add a new Lookup activity, specify the recently added dataset as the source dataset, and add the code above in the query box to determine the count of the changed records.

Click preview data and verify that the changed row count is equal to the number of rows inserted above.

Then add an IF condition activity and configure the expression as follows to check if the count is greater than zero.

@greater(int(activity('GetChangeCount').output.firstRow.changecount),0)

Edit the True condition and drag a Wait activity on the canvas to simulate some activity if this condition is met.

Run the pipeline in Debug mode and observe that the Wait step is executed as there are changed rows to process.

Now edit the True condition and replace the Wait activity with a Copy Activity. In the source tab specify the dataset, select query and enter the following:

DECLARE @from_lsn binary(10), @to_lsn binary(10); 
SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());
SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')

Click preview to verify that the query returns the changed rows correctly.

The data will need to be saved to a storage account, in this case ADLS Gen2. In the Sink tab, create a new dataset, choose Azure Data Lake Storage Gen2, choose CSV and click Continue. Specify a dataset name and choose new linked service. Specify a linked service name e.g. datalake, and select the subscription and storage account name created in the prerequisites above. Click Create. Browse to the location where file should be saved, in this case the raw container and select first row as header.

Click Debug to test the pipeline and verify that the file was generated. If successful, publish the pipeline to save it.

Now that we have the core elements of the pipeline working we now need to add a tumbling window trigger, parameterise the start and end LSNs and configure a more appropriate output path and filename.

Note there are possibly other solutions to the approach above which utilise a set of stored procedures. One could store the LSNs per table in a separate table and keep this maintained via a stored procedure however this seems overly complicated considering the LSN windows are defined by the interval of the tumbling window trigger. Another reason this may be further complicated is that the lookup activity cannot return a binary/byte array which is the data type of the LSN, and one would not easily be able to pass these as parameters in the pipeline.

ADF Pipeline Parameters

Add two parameters to the pipeline which will represent the tumbling window start and end time. For debugging purposes add default values but ensure the triggerStartTime is not prior to CDC being enabled on the table otherwise this will throw an error.

In order to utilise these parameters in the Lookup query use the concat statement as follows. Note single quotes need to be converted to double quotes.

@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than or equal'', @end_time);
SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')

Similarly, the copy activity in the If condition must be updated.

@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than or equal'', @end_time);
SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')

Next, navigate to the Sink tab of the copy activity and open the dataset properties. Click on the Parameters section and add a new parameter called triggerStart

Click back to the Connection tab and add dynamic content for both the directory and the file sections. This will store the data in a customers/incremental subfolder with date based partitions. Enter the following in the directory section:

@concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))

The filename will be the full date and time, suffixed with the csv extension:

@concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')

Click back to the copy activity and enter dynamic content in the triggerStart value with the following expression.

@pipeline().parameters.triggerStartTime

Finally click on the setting tab and configure the Data integration units to 2. By default this is set to auto which consumed 4 DIUs even for this basic copy operation.

Debug the pipeline and ensure the folder structure and output file is generated as expected. Download and open the file to verify the contents. If successful publish the pipeline.

Observe how the parameters are being injected into the query:

Finally, configure a tumbling window trigger to run the pipeline at a regular interval and set start and end time parameters. Specify a start time which is equal to the end time of the debug window above.

On the next screen specify the following values for start and end parameters.

@formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')
@formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')

Note the trigger will only run once it has been published. Additionally the expected behavior of tumbling window is to run all historical intervals from the start date until now. More information regarding tumbling window triggers can be found here.

Publish the trigger and make some additional changes to the customer table.

insert into customers (customer_id, first_name, last_name, email, city) values (4, ‘Farlie’, ‘Hadigate’, ‘fhadigate3@zdnet.com’, ‘Reading’);
insert into customers (customer_id, first_name, last_name, email, city) values (5, ‘Anet’, ‘MacColm’, ‘amaccolm4@yellowbook.com’, ‘Portsmouth’);
insert into customers (customer_id, first_name, last_name, email, city) values (6, ‘Elonore’, ‘Bearham’, ‘ebearham5@ebay.co.uk’, ‘Portsmouth’);
update customers set first_name=’Elon’ where customer_id=6;
delete from customers where customer_id=5;

After some time, check the output folder to verify whether these latest changes are copied to the lake.

An interesting observation here is how the get net changes function works. Even though there were 5 DML operations run above, only the net result is extracted by the CDC function. In order to obtain all changes, utilise the get_all_changes function instead.

In the monitoring section, review the pipelines runs and click on the consumption icon of the run which took the longest — evidence that the copy activity occurred.

If the DIUs for the copy activity was set to 2 then the DIU cost for this run is the DIUs per minute multiplied DIU cost:

(1/60)*2 = 0.0333 * 0.25 = $0.0083

The Lookup activity can similarly be calculated as follows:

(1/60) = 0.0167 * 0.005 = $0.000083

Conclusion

There may be a number of ways to copy incremental changes from a SQL DB using ADF however change data capture is only supported with SQL MI.

ADF can copy these changes to the data lake using a tumbling window trigger which defines the start and end window for each pipeline run.

The costs of running a copy activity on a frequent basis can be reduced by using a lookup activity to check whether any records have been changed prior to running the copy activity.

Reminder

Remember to pause the tumbling window trigger and clean up resources if no longer needed to avoid unnecessary cost.

--

--

Nicholas Hurt

My personal blog, usually tech related. My views are my own.