Medallion Architecture in Data Lakehouse with Delta Lake and Databricks

Valentin Loghin
9 min readFeb 15, 2024

--

What is a medallion architecture?

The medallion architecture describes a series of data layers that denote the quality of data stored in the lakehouse. Databricks recommends taking a multi-layered approach to building a single source of truth for enterprise data products. This architecture guarantees atomicity, consistency, isolation, and durability as data passes through multiple layers of validations and transformations before being stored in a layout optimized for efficient analytics. The terms Bronze (raw),Silver (filtered, cleaned, augmented), and Gold (business-level aggregates) describe the quality of the data in each of these layers.

What is a lakehouse?

We can define a lakehouse as a system that merges the flexibility, low cost, and scale of a data lake with the data management and ACID (Atomicity, Consistency, Isolation, and Durability)transactions of data warehouses, addressing the limitations of both. Like data lakes,the lakehouse architecture leverages low-cost cloud storage systems with the inherent flexibility and horizontal scalability of those systems.
The goal of a lakehouse is to use existing high-performance data formats, such as Parquet, while also enabling ACID transactions (and other features). To add these capabilities, lakehouses use an open-table format, which adds features like ACID transactions, record-level operations,
indexing, and key metadata to those existing data formats. This enables data assets stored on low-cost storage systems to have the same reliability that used to be exclusive to the domain of an RDBMS. Delta Lake is an example of an open-table format that supports these types of capabilities.

Problem

In this article we will create a Data Lake in format Delta using the Sales dataset , will build out a medallion architecture in Databricks using Python Notebooks, at the end of article the notebooks will be orchestrated using Azure Data Factory pipelines.

Solution

The below solution assumes that you have access to a Microsoft Azure account, with credits available for testing different services. Follow this link to create a free Azure trial account, a Storage account, Azure Data Factory and Databricks services must be create prior starting the implementation.

To use a free account to create the Azure Databricks cluster, before creating the cluster, go to your profile and change your subscription to pay-as-you-go. For more information, see Azure free account.

Data Lake Structure

Located on Azure Data Storage Gen2 , our data lake has a folder for every layer of medallion architecture.

Bronze: this layer contains just raw data located on csv subfolder.

SILVER: the table SALES are holding the cleansed and transformed data in format Delta. The table was partitioned by Order_Date in order to gain performance when interogated .All the data transformation was done by using databricks notebook.

GOLD : contains a model in star schema (DIM_DATE, DIM_CUSTOMER, DIM_PRODUCT,FACT_SALES).

Sales conceptual data model

Building the tables structure

1. Logging in to Azure

2. Gathering Data Lake connection information from Azure Portal

2.1 The Storage account name (in my case medallionedw you will put yours) and the Container ( Sales) .

2.2 Shared access signature (SAS) for the storage account.

3. Connect to the Databricks service

3.1 Create a databricks cluster ( when working with data in databricks we need a compute)

!!!Note: From this point you will have to add multiple notebooks in your workspace , every time you will follow the steps : left corner chose (+) New -> Notebook , once the notebook will be created you should renamed it, past the code provided and execute it, for every entity an image and code was provided .

3.2 Mount an Azure Data Lake Storage Gen2 filesystem to DBFS( allow databricks to query the files from storage account) execute the notebook nb_Mount_Blob by selecting the Run Cell from the right corner.

#notebook  'nb_Mount_Blob'
# Mount le Blob dans Databricks
container = "your container"
storageAccount = "your storage account "
accessKey = "your SAS"

accountKey = "fs.azure.account.key.{}.blob.core.windows.net".format(storageAccount)

# Set the credentials to Spark configuration
spark.conf.set(
accountKey,
accessKey)

spark._jsc.hadoopConfiguration().set(
accountKey,
accessKey)

# Mount the drive for native python
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container, storageAccount)
mountPoint = "/mnt/" + container
extraConfig = {accountKey: accessKey}

print("Mounting: {}".format(mountPoint))

try:
dbutils.fs.mount(
source = inputSource,
mount_point = str(mountPoint),
extra_configs = extraConfig
)
print("=> Succeeded")
except Exception as e:
if "Directory already mounted" in str(e):
print("=> Directory {} already mounted".format(mountPoint))
else:
raise(e)

3.3 Databases and tables structures creation (nb_Data_Structure_Creation).

%sql
CREATE DATABASE IF NOT EXISTS EDW_SILVER;
CREATE DATABASE IF NOT EXISTS EDW_GOLD;
%sql

CREATE OR REPLACE TABLE edw_silver.sales (
Saler_Order_Number VARCHAR(16777216),
Sales_Order_Linenumber INTEGER,
Customer_Name VARCHAR(16777216),
Email VARCHAR(16777216),
Item VARCHAR(16777216),
Quantity INTEGER,
Unitprice DECIMAL(8,4),
Tax DECIMAL(7,4),
file_modification_time TIMESTAMP,
file_name STRING,
file_name_path STRING,
Order_Date DATE
)
USING DELTA
PARTITIONED BY (Order_Date)
LOCATION '/mnt/sales/silver/sales';


create or replace TABLE edw_gold.fact_sales (
Customer_Id BIGINT,
Item_Id BIGINT,
Order_Date_Id BIGINT,
Quantity BIGINT,
Unitprice DECIMAL(8,4),
Tax DECIMAL(7,4)
)USING DELTA LOCATION '/mnt/sales/gold/fact_sales';

create or replace TABLE edw_gold.dim_customer (
Customer_Id BIGINT GENERATED ALWAYS AS IDENTITY,
Customer_Name VARCHAR(100),
Email VARCHAR(50)
)USING DELTA LOCATION '/mnt/sales/gold/dim_customer';

create or replace TABLE edw_gold.dim_date (
Order_Date_Id BIGINT,
Order_Date DATE,
Day int,
Month int,
Year int,
YYYYMM VARCHAR(10)
)USING DELTA LOCATION '/mnt/sales/gold/dim_date';

create or replace TABLE edw_gold.dim_product (
item_id BIGINT GENERATED ALWAYS AS IDENTITY,
item_name VARCHAR(200)
)USING DELTA LOCATION '/mnt/sales/gold/dim_product';

3.4 Sales silver table creation(nb_Sales_Silver).

from pyspark.sql.functions import lit, input_file_name
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
import datetime
import time

csvFile = '/mnt/sales/bronze/raw/csv/Orders*.csv' # reading the data from all the 3 files in a dataframe

df = (spark.read # The DataFrameReader
.format("delta")
.option("header", "false") # The first line is not the header
.option("sep",";")
.option("inferSchema", "true") # Automatically infer data types
.option("*", "_metadata")
.csv(csvFile) # Creates a DataFrame from CSV after reading in the file
.select("*", "_metadata.file_modification_time", "_metadata.file_name")
)


df = df.withColumnRenamed("_c0", "Saler_Order_Number")
df = df.withColumnRenamed("_c1", "Sales_Order_Linenumber")
df = df.withColumnRenamed("_c2", "Order_Date")
df = df.withColumnRenamed("_c3", "Customer_Name")
df = df.withColumnRenamed("_c4", "Email")
df = df.withColumnRenamed("_c5", "Item")
df = df.withColumnRenamed("_c6", "Quantity")
df = df.withColumnRenamed("_c7", "UnitPrice")
df = df.withColumnRenamed("_c8", "Tax")
df = df.withColumn("file_name_path", input_file_name())
df.createOrReplaceTempView("stg_sales_silver")
%sql
MERGE INTO edw_silver.sales as target
USING stg_sales_silver as source
ON target.Order_Date=source.Order_Date and target.Customer_Name=source.Customer_Name and target.Item=source.Item
WHEN NOT MATCHED
THEN INSERT *

3.5 Fact_Sales gold table creation (nb_Sales_Gold).


dfss = spark.sql("SELECT coalesce(dp.item_id,-1) as Item_Id,coalesce(dd.Order_Date_Id,-1) as Order_Date_Id,coalesce(dc.Customer_Id,-1) as Customer_Id,ss.Quantity,ss.UnitPrice,ss.Tax FROM edw_silver.sales ss left join edw_gold.dim_product dp on dp.item_name=ss.Item left join edw_gold.dim_date dd on dd.Order_Date=ss.Order_Date LEFT JOIN edw_gold.dim_customer dc on dc.Email = ss.Email")
dfss.createOrReplaceTempView("stg_sales_gold")
%sql
MERGE INTO edw_gold.fact_sales AS target
USING stg_sales_gold AS source
ON target.Order_Date_Id = source.Order_Date_Id
AND target.Customer_Id = source.Customer_Id
AND target.Item_Id = source.Item_Id
WHEN NOT MATCHED
THEN INSERT *

3.6 Dim_Date gold table creation (nb_Dim_Date_Gold).

%sql 
create or replace temporary view stg_dim_date
as
select distinct cast(replace(cast(Order_date as string),'-','') as bigint) as Order_date_id,Order_date,day(Order_Date) as Day,Month(Order_Date) as Month, Year(Order_Date) Year, substring(replace(cast(Order_Date as string),'-',''),1,6) as YYYYMM from edw_silver.sales;
%sql
MERGE INTO edw_gold.dim_date as target
USING stg_dim_date as source
ON target.Order_Date=source.Order_Date
WHEN NOT MATCHED
THEN INSERT *

3.6 Dim_Customer gold table creation (nb_Dim_Customer_Gold).

# Create a dataframe on the silver sales table
dfs = spark.read.load("/mnt/sales/silver/sales")
#we will keep just the customer related columns 
dfc= dfs.select("Customer_Name","Email")
# remove the duplicated records on Customer_Name,Email
dfc = dfc.dropDuplicates(["Email", "Customer_Name"])
# create a temporary view on the dedublicated customer data set
dfc.createOrReplaceTempView("stg_dim_customer")
%sql
MERGE INTO edw_gold.dim_customer as target
USING stg_dim_customer as source
ON target.email=source.email
WHEN NOT MATCHED
THEN INSERT (Customer_Name,Email) values (source.Customer_Name,source.Email)

3.7 Dim_Product gold table creation (nb_Dim_Product_Gold).

%sql 
create or replace temporary view stg_dim_product
as
select distinct item as item_name from edw_silver.sales;
%sql
USE edw_gold;

MERGE INTO edw_gold.dim_product AS target
USING stg_dim_product AS source
ON target.item_name = source.item_name
WHEN NOT MATCHED
THEN INSERT (item_name) VALUES (source.item_name);

All the Databricks elements of our solution were successfully deployed, as I mentioned at the beginning the notebooks used to transfer and load the data will be orchestrated by Azure Data Factory pipelines, in order to do that I created an Azure Data Factory solution.

CREATE AZURE DATA FACTORY (ADF) ETL solution to populate the Data Lakehouse tables.

Those are the steps :

  1. Start the Azure Data Factory service

2.Pipeline pl_sales_deltalake_master pipeline creation

Create the folder EDW_DATABRICKS inside the Pipelines section by clicking the 3 dots located on the right side of the section

3. Add a new pipeline inside the EDW_DATABRICKS folder, pl_sales_deltalake_master

4.Add 6 Notebook activities in the pipeline canvas.

4.1 In the General tab of the first notebook for Name * put: nb_Mount_Blob

4.2 In the Azure Databricks tab of the notebook for Databricks linked service chose your instance

4.3 In the Settings tab of the notebook follow the steps from picture

Repeat those steps till the rest of the notebooks are added and connect then like in the picture below.

Let’s run the ADF pipeline and populate the Delta tables

Chose the Debug button (this trigger the pipeline execution).

  • The ETL was successfully executed
  • Let’s check the data from the tables in databricks by running a validation script(in Databricks on the left menu (+) New -> Notebook, past the script and Run all)
%sql
-- VALIDATION QUERY
SELECT 'SALES_SILVER - SOURCE ' AS TABLE_NAME ,count(*) as Count FROM EDW_SILVER.SALES
UNION
SELECT 'DIM_CUSTOMER' AS TABLE_NAME ,count(*) as Count FROM EDW_GOLD.DIM_CUSTOMER
UNION
SELECT 'DIM_PRODUCT' AS TABLE_NAME, count(*) as Count FROM EDW_GOLD.DIM_PRODUCT
UNION
SELECT 'DIM_DATE' AS TABLE_NAME,count(*) as Count FROM EDW_GOLD.DIM_DATE
UNION
SELECT 'FACT_SALES' AS TABLE_NAME,count(*) as Count FROM EDW_GOLD.FACT_SALES

Check the new hierrarchy created on Azure Data Blob Storage where the data is actually stored in Delta format. As we can see for every table, we look on Gold layer, was created a sub folder and inside sub folder the parquet files.

My presentation reached the end, the data lakehouse can be accessed using different technologies than Databricks , we can use Microsoft Fabric ( check the articles I already wrote about the subject), Azure Synapse Analytics, Snowflake.

--

--