Orchestrate & Build ETL pipeline using Azure Databricks and Azure Data Factory v2 (Part — 2)

Pujith Vaddi
ETL with PySpark/SparkSQL
5 min readOct 27, 2020

Hi there,

In the previous post I have described about setting up required resources to build our ETL pipeline i.e., creating azure databricks workspace, ADLS Gen2 for data source and destination and mounting ADLS Gen2 to the DBFS.

In this post let’s write some pyspark/Spark SQL code to extract & transform our data and finally load the result to sink. I have loaded 6 different tables from sql db to ADLS Gen2 using a generic pipeline that I have created using Azure Data factory.

Below is the list of tables that I have loaded from sql db to ADLS Gen2 container, on which we will apply our transformation logic and load it to sink.

payment_types_lookup
products
sellers
customers
orders
payment_information

Since we have extracted the data from the source, and loaded to our staging area i.e., ADLS Gen2 container let’s go ahead and perform our transformations on our data. Before doing that, we have to mount our storage system to databricks file system, since our data is on external storage system. Please find the link below which explains how to mount ADLS Gen2 to DBFS.

Now we have our required data ready, let’s start writing some code to do some transformations on our data.

Considering you have quite good knowledge about pyspark and Spark SQL, I will not go deep into the explanation of the code. Even if you are new to spark, don’t worry much as most of the code has Spark SQL which is very easy to understand and very less of pyspark code.

Note: If you are a beginner and new to spark, don’t worry much as it is very easy to learn, understand, implement and moreover very interesting to learn when compared to traditional bigdata technologies. There are plethora of resources to learn spark and it’s various API’s, including Apache spark official documentation.

Pyspark/SparkSQL code implementation:

Below image describes how we will organize our notebooks to perform our transformations:

As mentioned above we have 6 tables at source, each corresponding to one entity. Consider the data at source is related to an e-commerce site. I have divided these tables into facts and dimension tables in data warehouse and created one notebook corresponding to each table in data warehouse. So, I have 4 notebooks created for performing transformations corresponding to each table.

Each table requires its own set of transformations and is independent of other tables. So, I will discuss about one fact table and one dimension table in this post. You can get transformation code (notebooks) from my GitHub repo link:

Dim_Customers_Info Notebook:

This notebook refers to the customers table which stores customer unique id, name, pincode, address, no of times a customer has ordered till date. This is a dimension table. so, our target is to select the above specified columns from corresponding source table, which could be from one or more tables.

Steps involved :

  1. Create dataframe and read the data into the dataframe.
  2. Create Temporary view(in our case it is GlobalTempView).
  3. Use spark.sql method to write the spark SQL code and store the result to a dataframe.
  4. Write the results stored in dataframe to a sink store.

Above steps mentioned are generic steps followed across all the notebooks and there are some extra steps in some of the notebooks which you can google it if needed.

Step 1:

First create a data frame if you are using pyspark, dataset if you are using spark scala, to read your data using spark.read method. syntax is as below:

df_customers = spark.read.csv(path=’/mnt/salesanalytic/Incremental/Customers_ADLS_Stage1/customers.csv’, inferSchema=”false”, header=’true’, schema=customers_schema)

If you want to infer custom schema on your data, you can set inferSchema property to false and provide your schema by passing it’s value to schema property. Now as we are done with creating the data frame, let’s create a temporary view on this data frame, which will result in registering your data frame as a temporary SQL table, so that we can write and execute some SQL queries on it.

Step 2:

Syntax: df_customers.createOrReplaceGlobalTempView(‘customer_info’)

Generally, there are two types of views that can be created on a data frame.

  1. Temporary view => createOrReplaceTempView(‘name’)
  2. Global Temp view => createOrReplaceGlobalTempView(‘name’)

The main difference between these two is that, temporary views are locally scoped which means, when created this view can only be used with in the scope of that notebook/spark session. Whereas global temp views are scoped globally as the name suggests itself i.e., when created that particular view can be accessed across notebooks or spark sessions. We can also register our data frame as a permanent table(refer to Apache spark documentation here).

Steps 3&4:

Now we can write some transformation logic using spark SQL:

Spark SQL Transformation code

From the above image you can see that I have queried the required fields from corresponding table views i.e., from customer_info and orders(orders table view). I had stored the result to a data frame and finally wrote the result to ADLS Gen2 container. Below is the image of the file in sink and it’s contents:

Dim_Customer_Info result stored in ADLS Gen2 container

Fact_Sales_Orders Notebook:

The only change in this notebook is the sql query that does the required transformations. All the other steps remain same. Please find the image for Spark SQL query:

Once the results are stored to the dataframe we will write the results to our external storage i.e., ADLS Gen2 storage account. We can write the dataframe to external storage systems like Amazon S3, Azure Data Lake Stores(Gen1 & Gen2), Azure Blobs, etc. using DataFrame.write method of DataFrameWriter class.

That’s all!!!

Please refer to my GitHub repo for all the note books.

In the next part of this series we will see how to organize notebooks, run all notebooks from a single notebook using dbutils, create jobs and schedule them.

Thank you :)

--

--