ADF mapping Dataflows for the impatient — ELT Pipeline
This is a technical deep dive into mapping dataflows. This guide assume you already have general understanding of mapping dataflows or you have read the introduction post here. In this post I will go through an ELT pipeline which reads data from an OLTP database, performs ELT transformation and store the data in Data Warehouse table. The pipeline is built upon Kimbal’s dimensional model and the outcome is a SCD type 2 table.
TL;DR This is a step by step guide of building a slowly changing dimension data pipeline using Azure Data Factory Mapping Dataflows. Mapping Dataflows is a new feature of ADF and still in private limited preview (work in progress).
Data source background
The data source is the orders table from an imaginary retailer. Every time a new order is created on the system a new row is added to the orders table and goes through the fulfillment cycle until all items on the order is delivered. Here is the schema of the the table:
[OrderID] [int],
[CustomerID] [int] NOT NULL,
[SalespersonPersonID] [int] NOT NULL,
[PickedByPersonID] [int] NULL,
[ContactPersonID] [int] NOT NULL,
[BackorderOrderID] [int] NULL,
[OrderDate] [date] NOT NULL,
[ExpectedDeliveryDate] [date] NOT NULL, [CustomerPurchaseOrderNumber] [nvarchar](20) NULL, [IsUndersupplyBackordered] [bit] NOT NULL,
[Comments] [nvarchar](max) NULL,
[DeliveryInstructions] [nvarchar](max) NULL,
[InternalComments] [nvarchar](max) NULL,
[PickingCompletedWhen] [datetime2](7) NULL,
[LastEditedBy] [int] NOT NULL,
[LastEditedWhen] [datetime2](7) NOT NULL
With a quick look at the table structure it is clear that OrderID is the PK for this table and among other columns ExpectedDeliveryDate, Comments, Internal Comments, PickingCompletedWhen, LastEditedBy and LastEditedWhen are possibly the most frequently changing columns while the order goes through the fulfillment life cycle.
The DW use case
The primary initiative for WorldWideImporters to build a data warehouse was to answer questions like:
- How many orders and which ones have had their delivery date moved (postponed or otherwise)?
- Which Suppliers mostly could not deliver to the expected delivery dates?
- How can they provide a more accurate expected delivery date for each order based on the orders in the past?
- Sentiment Analysis on the Comments added to the order over time?
They realized the only way for them to be able to come up with answers for these questions is by recording the history of changes to each order and be able to perform AS-IS and AS-WAS analysis on their order information.
Building an ELT pipeline in Mapping Dataflows
First of all I am referring to this pipeline as ELT rather than ETL; and the reason is according to modern data warehouse best practices we are retrieving the data from the operational system first and storing it in Azure Blob Storage before performing any transformations. Hence the “Load” step happens before any transformation. The primary reason for this is something I always emphasize on when I talk about modern data platforms:
Massive Object storage (Azure Blob Storage or ADLS) is the most cost effective solution in any cloud platform (Azure or others).The pricing, reliability and durability of this type of storage easily justifies storing the raw data in case we need it later. It could be for auditing, building data lineage, building ML models or making it available compute engines other than the one currently being used.
Step 1 — Extract and load the data from source
This is where ADF shines with over 80 data connection built in, we can extract the data from virtually any source before starting the transformation pipeline.
So in the first step of our pipeline we leverage the ADF copy data activity to load the orders data from our OLTP source into Blob storage in Parquet format and stage it in a Blob container(and virtual directory) or possibly Azure Data Lake Storage.
Why Parquet? with the raw data being stored in Parquet file format we eliminate the possibility to having broken text delimited files, misplaced fields, delimiters within the text field and most importantly it allows us to deal with schema drift gracefully. Also the compression is much more efficient which can benefit us in downstream processing activities.
Step 2 — Transforming the transactional data to historical rows using mapping dataflows
As mentioned in the introduction post every dataflow starts with one or more source data set(s). In the case of this pipeline we require two sources
- The OLTP Parquet file from previous step
- The DW SQL table to read in historical records.
Full load or delta?
Every time you are building an CDC pipeline one of the first question to be asked is if the data coming from the transactional source going to be a full load or a delta load?
- Full load is when a data source does not have any means of identifying records that has changed since the last time we preformed a retrieval. As result the pipeline needs to extract the full table and identify the changed records. This has both technical and non-technical drawbacks. the non-technical drawback is that your incoming data sizes will rise as time goes on, and this translate to bigger compute resource requirement. The technical challenge is an extra step within the pipeline to first filter out the new records from the ones already processed in previous runs of the pipeline.
- Delta load is when the transactional source has the means of either stamping records with a change timestamp so we can filter the most recent records or it comes with CDC capability builtin (e.g. SQL Server Temporal tables) which keeps track of all changes in a separate repository.
Note: The decision to either perform a full load or a delta load is totally dependent of the data source you are dealing with plus if the transactional system involves in records being deleted from it. If we are dealing with a source that has record deletion, performing delta load is not as easy as only filtering on record change timestamp. We either has to resort to full load or implement some sort of soft delete mechanism or CDC on the source.
For the demonstration purposes I decided to perform a full load, although there is a “lastEditedWhen” field in my data source and can be used for filtering changed records.
Identifying changed records in a full load
There are multiple ways of identifying changed records between data sets the basic method is to compare every field for the two records with the same PK and if any of the fields are different mark it as a changed record. This method although could be very simple to implement, it has some drawbacks.
- Every time the source data changes (column name or type change, new columns added, columns removed) the down stream pipeline needs to be adjusted so it does not perform well with Schema drift.
- Can be very compute intensive to perform the comparison with various columns of two data sets ( e.g. outer joins on every column except the PK).
The more robust way getting the same outcome is to build a hash of all columns except the primary key using a function like MD5 or similar and compare the two hash columns. Also to save on compute power we can store the hash a column in the DW table so we only need to perform hashing of data arriving from the transactional source.
With all that said the first step is to use the “Derived Column” transformation out of the source data set and generate the hash column. Lucky for us mapping dataflows has an expression builder with all these functions built in. so here is how I composed my MD5 Hash expression
md5( iif(isNull(CustomerID),'',toString(CustomerID))+ iif(isNull(PickedByPersonID),'',toString(PickedByPersonID))+ iif(isNull(SalespersonPersonID),'',toString(SalespersonPersonID))+ iif(isNull(PickedByPersonID),'',toString(PickedByPersonID))+ iif(isNull(ContactPersonID),'',toString(ContactPersonID))+ iif(isNull(BackorderOrderID),'',toString(BackorderOrderID))+ iif(isNull(OrderDate),'',toString(OrderDate))+ iif(isNull(ExpectedDeliveryDate),'',toString(ExpectedDeliveryDate))+ iif(isNull(CustomerPurchaseOrderNumber),'',toString(CustomerPurchaseOrderNumber))+ iif(isNull(IsUndersupplyBackordered),'',toString(IsUndersupplyBackordered))+ iif(isNull(Comments),'',toString(Comments))+ iif(isNull(DeliveryInstructions),'',toString(DeliveryInstructions))+ iif(isNull(InternalComments),'',toString(InternalComments))+ iif(isNull(PickingCompletedWhen),'',toString(PickingCompletedWhen))+ iif(isNull(LastEditedBy),'',toString(LastEditedBy))+ iif(isNull(LastEditedWhen),'',toString(LastEditedWhen)) )
basically for every column we check if they are Null and if it is, replace it with an empty string. If not, replace it with string equivalent of that (data could be integer, date or etc) then concatenate all these values and pass them to md5 function.
iif(isNull(CustomerID),”,toString(CustomerID))
- if isNull(CustomerID)
- TRUE : return ” (empty string)
- FALSE : return the toString of the column value
Tip: MD5 is a function to hash strings and that is why all values need to be converted to string before concatenation. Also we need to replace “NULL“s with an empty string (or possibly any other constant string), as NULL is an undefined value (an empty place holder) and MD5 function cannot perform on an undefined value.
Bring in the DW records to compare
On the DW dataset source, our job is easy all we need to do is to filter out the “old closed records” using “filter” transformation and expose the rest to our data stream.
Also for visibility and manageability we use the “Select” transformation to rename the columns and add “DW_” to the begning of every column. This way later when we preform joins between datasets we can easily identify the columns (with the same name) coming from different sources.
So after these two transformations our DW stream looks likes the above.
Comparing the DW and OLTP records
So far we have records from both sources injected in to the pipeline, generated the md5 hash for records from the source and now we need to perform the comparison to find which records are:
- New: non existing OrderIDs (PK).
- Changed: existing OrderIDs with different hash.
This can be done using an full outer join (join transformation) or lookup transformation. And then passing it on to a “Conditional Split” transformation.
Look up transformation is very straight forward to use, all we need to do is to select the PK column on both sides of the lookup (outer join). The conditional split though is a bit trickier to setup.
We use the conditional split to split our data set into three streams.
- If in the lookup the “DW_OrderID” was Null then that means this is a newly created record.
- If the “DW_OrderID” is not null but the hashes are different then this is a changed record.
- All other records must be the unchanged records (Remember we are doing a full load strategy so it includes all records: New, Changed, Unchanged.)
Pro Tip: Conditional split transformation in conjunction with expression builder becomes the most powerful tool in your tool set to perform data quality tasks. On top of the rich set of functions available in the expression language you have access to Regex which in my experience will cover almost all text or non-text manipulations requirements.
Generating all the records for Insert or Update
Clearly all the identifications out of the way the next step is to generate all the records for insert and update (or UpSert). In case of SCD type 2 you are dealing with
- Inserting new records and stamping them with the current timestamp as the record open timestamp and current indicator to ‘Y’. (third row in table below)
- Updating the changed records and stamping them with the current timestamp as record closed timestamp and setting the current indicator to ’N’. (second row in table below)
- Inserting changed records with new values and stamping them with the current timestamp as the record open timestamp and current indicator to ‘Y’ . (Third row in table below)
Order idcol1..nhashRec_start_dtRec_end_dtRec_current_ind1val 1xyz20–02–201901–03–2019N1val 2xyd01–03–2019NULLY2val 5xbx01–03–2019 NULLY
In order to achieve this the “NewRecords” part of the conditional split gets marked as an insert. but the “Update” stream part of the split now needs to get replicated into two downstream branches to generate the update and insert from it.
This is where the “New Branch” transformation can be used. What “New Branch” transformation does is basically replicating the same stream into another stream. From there we use the derived column transformation to add the “rec_end_dt” , “rec_start_dt” and “rec_current_ind” to the respective records. Finally we use the “select” transformation to select the respective records. Old values for updates (first row in the example table) and new values for inserts (second row in the example table above).
By now the records to be updated are completed and new records to be inserted have all the values that they need except 1, the surrogate key!
Hence we use a “Union” transformation to combine the two “Insert”(boxes 1 and 2) streams and pass them to “Surrogate Key” transformation (box 3).
Final step — Putting it all together and writing to destination
Now we use another “Union” transformation (Third box from right above) to combine all the streams and pass them to “Alter row” transformation and mark each row for Insert or Update and finally use a SQLDB data set as a sink.
“Alter Row” is the latest transformation just added to mapping dataflows and at the time of writing this post there is no documentation for it yet. The figure above is how I set it up but once the official documentation is published I may figure it was not the best way of using it!
Also this is how I set up the SQLDB sink (with Allow insert and Allow Update). Again this is a very new functionality added to the product and there is no official documents published at the time of writing this post.
Note:Unfortunately my efforts of getting the SQLDB sink to perform any operation other than “Insert” resulted in pipeline failure. Mapping dataflows is a work in progress at the moment and in my experience there are features being added and fixes being applied constantly.
Being unsuccessful with performing updates I had to resort to changing the flow slightly to re-create the full table every time the pipeline runs. It is not ideal but allowed me to test my pipeline end-to-end.
How about no RDBMS at all?
So working with mapping dataflows the very first question came to my mind (being a modern data platform advocate) was “What if the solution did not have any RDBMSs?”. In the current solution we are relying on the RDBMS “upsert” capabilities to avoid doing a full refresh of the data set and avoid handling the old data through the logic needlessly.
The current data set I have been experimenting with is roughly 2+ Million rows, which in real world scales considered a small data set so I could afford to easily run it as a full refresh on every run. But clearly in real world scales it would be a very heavy overhead reading the whole data set applying the changes and re-writing it to the target data set.
The imperfect solution
Full Refresh to Parquet data set with Databricks table on top: As you could imagine in to build your data warehouse in in Blob storage or Data Lake Storage (ADLS) we will require to re-create the whole data set every time as I am sure you are aware files in Blob storage or ADLS are immutable and provide no means of updating records inside a file (Even if it was not immutable there is no means for mapping dataflows to find the records and change the contents of a file). For this to work we would need to read in the whole data files from the storage(with the Parquet source), run through our ELT pipeline, generate a new data set and finally overwrite the old data set files. But remember for this option to work we would require to write the final data set in to a staging directory and then once the mapping dataflows is done run a separate pipeline in ADF to replace the permanent table with the staging one. You can then have Azure Databricks external (un-managed) tables permanently created on the directory and use Spark-SQL (or ODBC connection) to perform your analysis.
This is clearly not an ideal solution, albeit applying updates and keeping track of changes most of the time is only required for dimensions (rather than the fact tables). Dimension tables by nature are the smaller part of any data warehouse, so with proper planning of resources (and clearly with the advantage of the cloud resource elasticity) it should not be a huge problem performing a full refresh every day.
The Ideal solution
The ideal solution would be one which we can take advantage of the power Spark to run our ELT pipeline (which mapping dataflows is providing us) on top of a massive object storage (Blob or ADLS) while not having to re-create our dataset every time. Well, thinking about it the answer would be Azure Databricks Delta tables! Delta tables provide the means to store the data directly in the cloud massive storage while also allows to apply updates (or Upserts) to it.
Unfortunately though at the time of writing this article Databricks Delta tables are not a data source for mapping dataflows so we are left with no option but the not ideal solution of re-creating our tables until then.
Schema drift and flexible modern data pipelines
One of the major features of Mapping dataflows is its ability to deal with changing source/target data sources and graceful schema drift handling. In the next post I will run through how this could benefit the modern data warehouse architecture and why is it very important.
Verdict and summary
I have spent the last two weeks working with ADF Mapping Dataflows and apart from the usual break downs in a beta software, my overall experience has been very positive.
- First and foremost, Mapping Dataflows is a work in progress so I would recommend not building any production work loads upon it, though if you are in planning or building phase of a project be mindful of it. Architect your projects so that once the time is right you can slide it in to your architecture. For instance if you decide to perform your ETL process using SSIS, transitioning to mapping dataflows would not be an easy task without major rearchitecting.
- It has almost all of the transformations required for building an effective ELT pipeline and when we add the power of expression language functions, sky is the limit.
- The debug capability is priceless to perform preliminary tests of your development work on the production data set without affecting the actual data.
- The Parquet and delimited file data sources recently added makes building a modern data platform architecture a breeze.
- Mapping dataflows run completely on Spark which means taking advantage of Spark’s lazy evaluation, DAG optimization and query push down. In other words it doesn’t matter if you placed your filter transformation before/after the aggregate transformation, Spark optimizer will place it where it is the most optimized.
Resources
- Microsoft Documentation: https://docs.microsoft.com/en-us/azure/data-factory/concepts-data-flow-overview
- Azure Data Factory Youtube channel. https://www.youtube.com/channel/UC2S0k7NeLcEm5_IhHUwpN0g/videos?view=0&sort=dd&shelf_id=1
- Early documentation of mapping ADF Dataflows on Github repo: https://github.com/kromerm/adfdataflowdocs
Originally published at www.mycloudypuzzle.com on March 3, 2019.