Azure Data Factory Pipeline

Amit Naik
6 min readMar 22, 2020

--

Brief introduction on Data Pipelines in Azure Data Factory.

Introduction

We all know that data in its raw and native form is mostly stored in relational, non-relational storage systems. This data doesn’t have any contextual meaning and can’t provide insights to analysts and business decision makers.

Data Pipelines are used to process this complex, large data into actionable business insights. Azure Data Factory (ADF) is a cloud service that is built for handling these operations.

The use case I have been working on relates to Machine Learning prediction of our company’s incurred, but not reported reserve. This is done using the current’s granularity as a baseline, but eventually getting to more detailed reserves (i.e. — more frequent reserves, certain medical conditions, etc).

This initiative involved migrating the data sources from the relational storage systems to Azure Storage accounts. This involved storing the data in ‘Blob Storage’ as well as in ‘Azure Data Lake Generation 2’ (ADLS 2) storage.

Once the data was stored in these storage systems, the data then had to be processed on. This involved steps of feature generation and predictive modeling.

The pipeline was divided into 3 stages: Landing Zone (LZ), Protected Zone (PZ), and Consumption Zone (CZ). The LZ was initial step to store the data from the on-premise system to Azure. The PZ was the 2nd step which transferred the data from the LZ (Blob Storage) to the PZ (ADLS 2). The 3rd stage stores the feature engineered data read in from the PZ into the Databricks File System (DBFS).

Data Factory Pipeline

Every ADF pipeline can have a set of pipeline parameters. These parameters can be used anywhere else in the pipeline. For every parameter, we have to define the name, type and default value. The different type of pipeline parameters include String, Int, Float, Double, Bool, Array, Object and SecureString. Pipeline parameters are useful because we don’t have to define the values each and every time. We will see how this works in later sections.

Pipeline Parameter Examples

Let’s dive into these pipeline steps in more detail!

Landing Zone to Protected Zone

Getting the data from an on-premise system to the cloud environment is surprisingly not as complicated as people would think it to be! A simple Distcp command from Hadoop File System (HDFS) allows us to migrate the data from our big data systems to the desired location on Azure. Once the data is on the cloud, it opens up new avenues since this data can be accessed by all services on Azure.

From an ADF perspective, we can set up pipeline parameters. The first step was to get the metadata of all the tables. This was done because we wanted to check if the most recent data is present in ADLS 2 storage account. If the data was present, we delete the data and load the most recent data from the LZ into the PZ.

Dynamic Datasets: Dynamic datasets can be created by making use of the pipeline parameters. We only need to define which pipeline parameters should be associated with that dataset and then only change the values in the beginning when we first start making our pipeline. This proves to be useful since we won’t have to define a different parameter names for every table we run the pipeline on.

Dynamic Dataset Properties

The parameter names in the picture above have the exact same name as the parameters defined for the overall pipeline. The default values are then assigned by invoking the @pipeline().parameters function.

The ‘Get Metadata’ activity needs to point to a dataset. For this, we have to create a dataset of type ‘parquet’ and assign it a file path so that the pipeline knows which dataset to look at. We also need to create a ‘Linked Service’ which refers to the storage account the dataset is stored on. This is where we make use of the pipeline parameters. A pipeline parameter can be called using the following format:

@pipeline().parameters.PARAMETER_NAME

Using pipeline parameters

An important point to note is that the ‘Get Metadata’ activity also has a ‘field list’ which acts as an argument. This enables the user to use these arguments in the next step. The different options for field list include Child Items, Exists, Item Name, Item Name, Item Type, and Last Modified.

If this is used in the dataset properties, above is referenced as a dataset parameter. This can then be used when defining the path of the dataset.

@dataset().DATASET_REFERENCE_NAME

Dataset Parameters

As you can see, PZ_RECENT_DATASET_CONTAINER contains the value of the pipeline parameter. This allows the the value to be treated as Dataset Parameter and can be used as needed.

Hitting Alt+P opens a window where regular expressions can be written, and this window also provides you the list of user-defined parameters.

The next step is to check if the dataset is actually present or not in the location provided. This can be done using the ‘If Condition’ activity. Each if condition activity needs to have an expression for it to evaluate on. The condition should return a True or False value. This can be done in the following manner:

@activity('Name of activity to be evaluated on').output.field_list

If Condition Expression

In our case, the name of activity will be ‘Get PZRecent Metadata’ and field_list would be ‘exists’. This is because in the ‘Get Metadata’ activity, we had specified that we should check to see if data exists in the location given. Based on the value of the condition, we can define 2 different activities, one for True and one for False. Pipeline parameters can then be used in those activities as needed.

Every activity can transition to another activity based on four different conditions. These are Success, Failure, Completion and Skipped. There may be cases where we want the pipeline to continue even on Failure. Or sometimes we may not care if that activity is a success or failure and it only needs to have completed successfully for the pipeline to go to the next step.

Types of Conditions

The ‘Copy Data’ performs the actual operation of moving the data from source to destination. One point to note here is the ‘File Type Path’. If we want our file path to be dynamic, we can make use of the ‘Wildcard File Path’. This allows us to include all the files in a folder which have a certain extension. For eg: *.parquet will give us all the file names to be copied which have a ‘.parquet’ extension. This can always be adjusted to suit your use case.

Linked Service: This defines which service you want to connect to your ADF. For example, if you want to connect a Blob Storage to your ADF, you will need to provide details like Access Key, Storage Account Name, Container Name etc.

Protected Zone to Consumption Zone

The last stage involves pushing the data from the PZ to the CZ. The CZ in this case is the DBFS. The ‘Notebook’ activity allows us to trigger a Databricks notebook. We need to create a Linked Service for connecting our ADF pipeline to the Databricks resource. Once this activity is triggered, it will start the notebook and execute all the cells in that notebook. The notebook contains code for feature engineering and predictive modeling, after which it is stored in the DBFS.

This was a very brief insight on ADF. I tried to cover as many points as possible without getting into too much detail. Hope this helps anyone working on Azure Data Factory!

Clap! Share! Follow!

--

--

Amit Naik

Senior Data Engineer at Capital One. Love to explore new things and very passionate about Data Science.