Slicing your data for Azure Data Factory

Approaches for dealing with non-dated sources

Azure Data Factory (ADF) orchestrates data movement across cloud and hybrid architectures, offering a cloud-based pay-as-you-go alternative to traditional data replication solutions. In this article we take a look at two mechanisms for enabling date-based queries on your data:

  1. Configuring Azure SQL Temporal Tables
  2. Building a history table on the fly within the pipeline

Slicing the source data into… slices

ADF operates on data in batches known as slices. Slices are obtained by querying data over a date-time window — for example, a slice may contain data for a specific hour, day, or week. Such a query would use a date-time column notionally representing an effective date or a last-modified date.

"source": {
"type": "SqlSource",
"sqlReaderQuery":$$Text.Format('select * from Order where orderDate >= \\'{0:yyyyMMdd}\\' AND orderDate < \\'{1:yyyyMMdd}\\'', WindowStart, WindowEnd)"
}

Slicing data in this way makes the processing of each slice independent, which means that slices can be run in parallel and rerun in the case of transitory failures. Compare this with traditional use of high-water mark counters (e.g. SQL Server’s log sequence number available with Change Data Capture), which permit only a single batch query of changes ‘since the last run’.

Data slices are generally expected to be idempotent — if the slice is rerun, the contents of the slice are not expected to change. This suggests that your source table should contain a history of effective dated records.


Add history to your data — the direct method

Adding an effective date column to your tables is an explicit and straightforward approach to capturing historical values. You’ll also need a way of end-dating data so that a record can be ‘deleted’, either with an endDate or an isActive flag (this is sometimes known as soft-delete). This approach substantially increases the complexity of queries on your data, however, as you will now need to filter your data for the most recent values.

SELECT
p_hist.id, p_hist.name
FROM (
SELECT
p.id, p.name, p.isActive,
ROW_NUMBER() OVER (
PARTITION BY p.id
ORDER BY p.effectiveDate DESC
) AS rowseq -- append sequence in date order
FROM Person_History p
) p_hist
WHERE
p_hist.rowseq = 1 AND -- only get the most recent row
p_hist.isActive = 1 -- only return non-deleted rows

The direct approach is best when there is a business meaning to effective date. For example, the payment terms applying to an order would be those existing at the time the order was taken, not those most recently updated. Effective date may also not necessarily be the date at which data was updated in the database (i.e. effective date needs to be explicitly set according to an external rule).


DIY Triggers

A very common approach is to build a shadow table of history against each table using database triggers. My advise here is not to reinvent the wheel but look for mature well-thought out implementations such as Cedric Baelemans Audit Trail Generator for Microsoft SQL.


Azure SQL temporal tables take care of your history

If you have a reasonably modern database, a generous commercial license and a good relationship with your DBA, you may be able to leverage built-in features of your database to capture data changes instead of building your own. You may be be able to take advantage of Temporal Tables which are supported directly in Azure SQL Database PaaS and SQL Server 2016, and can replace a DIY trigger-based audit. Here I am modifying the Adventureworks Customer table to capturing history:

ALTER TABLE [SalesLT].[Customer] ADD
SysStartTime datetime2(0) GENERATED ALWAYS AS ROW START HIDDEN
CONSTRAINT Customer_SysStart DEFAULT SYSUTCDATETIME(),
SysEndTime datetime2(0) GENERATED ALWAYS AS ROW END HIDDEN
CONSTRAINT Customer_SysEnd DEFAULT CONVERT(datetime2(0), '9999-12-31 23:59:59'),
PERIOD FOR SYSTEM_TIME (SysStartTime, SysEndTime);
alter table [SalesLT].[Customer] SET
(SYSTEM_VERSIONING = ON
(HISTORY_TABLE = SalesLT.Customer_History));

This adds two hidden datetime2 columns to the Customer table, and creates a shadow history table for recording data values. Here’s how it looks when using the Customer table:

UPDATE [SalesLT].[Customer] SET
title = ‘Dr.’
WHERE CustomerId = ‘1’;
SELECT
CustomerId, Title, SysStartTime, SysEndTime
FROM [SalesLT].[Customer]
FOR SYSTEM_TIME ALL
WHERE CustomerId = 1;
CustomerId  Title    SysStartTime                SysEndTime
----------- -------- --------------------------- -------------------
1 Mr. 2017-05-09 03:00:59 2017-05-09 03:01:41
1 Dr. 2017-05-09 03:01:41 9999-12-31 23:59:59

In your ADF pipeline you can now slice the Customer data using the SysStartTime column.

See the Temporal Table official documentation for more detailed info. If you’re on Oracle 11g+, check out the DBMS_FLASHBACK_ARCHIVE package which provides similar capability.


Resolving change records within the ADF Pipeline

Sometimes you just don’t have enough control over your database to add audit/change detection capabilities. Sometimes you will be querying a complex view and have difficulty resolving the subset of changes needed. What you need is to externalise the change detection.

If your dataset is reasonably constrained in size, you can copy the full table and resolve the changes within the ADF pipeline.

The Temporal Table from the previous section will not help here as it will generate audit entries for every entry in every slice, even when values have not changed. Instead I will use a stored procedure in Azure SQL to infer the deltas.

Let’s consider a scenario where ADF will read a Person table from an on premise database. Here I am reading a view in an oracle source which does not contain any date information, and sending the full table data to a stored procedure created in an Azure SQL Database:

"source": {
"type": "OracleSource",
"oracleReaderQuery": "$$Text.Format('select p.id, p.name from PERSON_VIEW p')"
},
"sink": {
"type": "SqlSink",
"sqlWriterStoredProcedureName": "dbo.updatePerson",
"storedProcedureParameters": {
"slicedate": {
"type": "String",
"value": "$$Text.Format('{0:yyyyMMdd}', SliceStart)"
}
},
"sqlWriterTableType": "dbo.Person_Type"
},

The stored procedure will take the input dataset as a table-valued parameter, and use the MERGE statement to build both a copy of the source Person table with effectiveDate applied, and aPerson_History table with all historical increments. Here is the code to create the tables and the stored procedure in the Azure SQL database:

-- This is the type of the incoming data source records
CREATE TYPE Person_Type AS TABLE (
[id] int NOT NULL,
[name] nvarchar(50) NOT NULL
);
-- This is our table containing the 'latest' person record values
CREATE TABLE Person (
[id] int NOT NULL,
[name] nvarchar(50) NOT NULL,
[effectiveDate] datetime,
[isActive] bit
);
-- This table will hold a full history of changes
CREATE TABLE Person_History (
[id] int NOT NULL,
[name] nvarchar(50) NOT NULL,
[effectiveDate] datetime,
[isActive] bit
);
CREATE PROCEDURE updatePerson 
@slice [Person_Type] READONLY,
@slicedate datetime
AS
BEGIN
MERGE Person AS d
USING @slice AS s
ON (s.id = d.id)
WHEN MATCHED AND
(s.name <> d.name OR d.isActive = 0)
THEN UPDATE SET
d.name = s.name,
d.effectiveDate = @slicedate,
d.isActive = 1
WHEN NOT MATCHED THEN
INSERT (id, name, effectiveDate, isActive)
VALUES (s.id, s.name, @slicedate, 1)
WHEN NOT MATCHED BY SOURCE AND
d.effectiveDate < @slicedate AND d.isActive = 1
THEN UPDATE SET
d.effectiveDate = @slicedate,
d.isActive = 0
OUTPUT INSERTED.* INTO Person_History;
END

Because the source query in the ADF pipeline could not utilise a date range, the query will pass the complete Person table in each slice to the stored procedure. Here’s a simulated example calling the stored procedure directly:

declare @slice1 Person_Type;
insert @slice1 (id, name) values (1, 'Romeo');
insert @slice1 (id, name) values (2, 'Juliet');
exec updatePerson @slice1, '1900-01-01';
declare @slice2 Person_Type;
-- true identities revealed
insert @slice2 (id, name) values (1, 'Romeo Montague');
insert @slice2 (id, name) values (2, 'Juliet Capulet');
exec updatePerson @slice2, '1900-01-02';
declare @slice3 Person_Type;
insert @slice3 (id, name) values (1, 'Romeo Montague');
-- what happend to Juliet?
exec updatePerson @slice3, '1900-01-03';
declare @slice4 Person_Type;
-- oh dear...
exec updatePerson @slice4, '1900-01-04';
select * from Person_History;
id          name                 effectiveDate           isActive
----------- -------------------- ----------------------- --------
1 Romeo 1900-01-01 00:00:00.000 1
2 Juliet 1900-01-01 00:00:00.000 1
1 Romeo Montague 1900-01-02 00:00:00.000 1
2 Juliet Capulet 1900-01-02 00:00:00.000 1
2 Juliet Capulet 1900-01-03 00:00:00.000 0
1 Romeo Montague 1900-01-04 00:00:00.000 0

The new Person_History table can now be utilised as the source in the next ADF pipeline activity. You will need to declare the dataset for Person_History with “external”: “true”, and in your pipeline include a dependency to output of the first copy activity to keep the sequencing in order:

"inputs": [
{ "name": "person-history" }, -- use this as source
{ "name": "person" } -- this is for dependency only
]

Here is the completed pipeline:

Finished pipeline using the stored procedure to build a history table

I look forward to hearing your own techniques to get the most out of Azure Data Factory. Stay tuned to the ADF Blog, there are rumours that slices based on a sequence number will be supported soon.