Data Lake in Azure — Part 2

Sarath Sasidharan
5 min readMar 12, 2019

--

This blog is a follow up of the previous one which focused on infrastructure as code ,

This blog focuses on :

  • Data loading / Extraction
  • Data Preparation / ETL
  • Data Sink

Prerequisites

Assuming that you have executed data lake in azure part 1 of the blog series.

  • Azure Data Lake Store Gen2
  • Azure Data Factory Gen 2
  • Azure Databricks
  • Azure SQL DB
  • Azure SQL Data Warehouse

Since this blog is focused on integration we have a simple use case of joining product data from a company to the product category data ( assuming this comes from an external source). The data set being used is the one which is from the sample database which was created in SQL DB.

Data Loading / Extraction

This part of the blog focuses on :

  1. Dump Data From SQL DB ( Dimension) data onto Azure Data Lake Store using an orchestrator ( Azure Data Factory)
  2. Dump Data from an external Data Source into the same storage layer .

Click on the resource group blade on your left panel at portal.azure.com. This should list the resource group which was created in the previous blog.

Next, click on the azure data factory which was created , and then under this click on Author and Monitor.

This opens up a the new UI for Azure Data Factory. To learn more about this service , you can find extensive documentation here.

To fake data on an external storage , you need to create a new storage account (ADLS Gen2) and then use the copy activity in data factory to copy the data from SQL DB to this new storage account which you have created. The table which needs to be copied here is the “Product Category” Table.

To Create a new pipleine click on … beside the pipeline blade , this should launch a new canvas.

Once this new canvas is launched , add a new name to the pipeline.

Add a new activity which will the first step in the pipeline , this copies data from external source to the central ADLS Gen2.

Drag this copy data activity on to the canvas and rename the activity.

Some work for you to explore , complete this exercise to :

  1. Copy the data from SQL DB (Product Category) to the external storage which was created in this blog , store this on a folder inside the central store under product_category folder.
  2. copy the data from the SQL DB (Product Table) to the central data lake store under product folder.

The end state for this pipeline is , make sure after each step of the pipeline select the validate and publish only then the changes will be saved.

Trigger this step manually to see if the pipeline works. This pipeline :

  • copies data from External store to the central data store
  • copies data from SQL DB (Relational) to the central data store for processing.

ETL Processing

This step combines the external data and the SQL DB data which has been copied to the central data lake store.

Azure databricks is used for this.

Click on the resource group created , and then on the databricks instance.

After login , Create a new new cluster on databricks.

Create a new notebook

Once this is done , copy this snippet of code into your notebook.

spark.conf.set(
"fs.azure.account.key.<NAME_OF_STORAGE>.dfs.core.windows.net",
'<ENTER_STORAGE_KEY>')
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://<NAME_OF_FILESYSTEM>@<NAME_OF_STORAGE>.dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

This piece of code allows you to read from ADLS Gen2 using keys , this is not a recommended approach but in later blogs we will talk about best practices , security and secrets.

product          = spark.read.csv("abfss://<NAME_OF_CONTAINER>@<NAME_OF_STORAGE>.dfs.core.windows.net/product",header=True)
product_category = spark.read.csv("abfss://<NAME_OF_CONTAINER>@<NAME_OF_STORAGE>.dfs.core.windows.net/product_category",header=True)

This reads the data into 2 data frames and loads headers.

productTab         = product.createOrReplaceTempView('product')
productCategoryTab = product_category.createOrReplaceTempView('product_category')
result_val = spark.sql('select a.*,b.ParentProductCategoryID from product a join product_category b on a.ProductCategoryID=b.ProductCategoryID')

The next step is to store the data back into the data lake. This stores it inside a folder product_category_out.

result_val.write.csv("abfss://<NAME_OF_CONTAINER>@<NAME_OF_STORAGE>..dfs.core.windows.net/product_category_out",header=True)

Once this notebook is saved , this can be called from Azure Data Factory and appended to the pipeline which was created earlier.

Refer to this documentation to get this done.

Once its complete , you should have a pipeline similar to this.

Trigger this pipeline and then go to the monitor tab to see the progress of this job.

--

--