Medallion architecture in Microsoft Fabric

Valentin Loghin
11 min readFeb 1, 2024

--

What is a medallion architecture?

Medallion Architecture is a system for logically organising data within a Data Lakehouse. A standard medallion architecture consists of 3 main layers, in order: Bronze, Silver and Gold. The increasing quality of precious metal in the names is no accident and represents an increasing level of structure and validation when moving through the layers. This architecture is also known as multi-hop architecture.

  • Bronze layer (raw data). The Bronze layer is where we deposit all data from external source systems. The table structures in this layer correspond to the source system’s table structures “as is”, plus any additional metadata columns that capture load date/time, process ID, etc. The emphasis in this layer is on rapid capture of change data and the ability to provide a historical archive of the source (cold storage), data tracing, audibility, reprocessing if necessary without re-reading the source system data.
  • Silver layer (validated, cleansed, compliant data). In Lakehouse’s Silver Layer, Bronze Layer data is mapped, merged, conformed and cleansed (“just enough”) so that the Silver Layer can provide an “enterprise view” of all its key business entities, concepts and transactions. (e.g. key customers, stores, unduplicated transactions and cross-reference tables).
  • Gold layer (tables organized at enterprise level). Lakehouse Gold Layer data is generally organized in “project-specific” lakehouses, ready for consumption. The Gold Layer is intended for reporting, and uses more denormalized, read-optimized data models with fewer joins. The final layer of data transformations and data quality rules is applied here. The final presentation layer for projects such as customer analysis. We see data models based on Kimball-style star schemas in this golden layer of the Lakehouse. In this way, you can see that data is preserved as it moves through the various layers of a lakehouse. In some cases, we’re also seeing a large number of Data Marts and EDWs from the traditional RDBMS technology stack being ingested into the Lakehouse.

Medallion architecture in Fabric

An example of a Microsoft Fabric-based lakehouse medallion architecture for the sales department is provided in figure below. While it is only one of many lakehouse architecture patterns, it is a great fit for modern data warehouses, data marts, and other analytical solutions.

In this example we will build a medallion architecture in a Fabric lakehouse using notebooks. We will create a workspace and 3 lakehouses, connect the bronze layer to the landing zone using shortcuts, transform the data and load it to the silver Delta table, transform the data further and load it to the gold Delta tables, and then explore the semantic model and create relationships.

For data source I used Microsoft AdventureWorks Sales csv files located outside fabric in the Gen2 blob storage in the location:

Create a workspace

  1. From the Microsoft Fabric home page, select Synapse Data Engineering.

2. From the left bar menu , select Workspace , New workspace, provide Sales_Fabric for the name and click Apply

3. Let`s add inside the new create Sales_Fabric workspace 3 lakehouses (Sales_Bronze, Sales_Silver, Sales_Gold)

4. Enable the Data model editing preview feature in the workspace settings.

5.Prerequisite : In Azure generate the SAS token for the storage where the source csv files are located (keep the value of SAS token for later use)

6. Connect the Sales_Bronze to the Landing Zone lakehouse adding a shortcut to the csv source files location.

  • Inside Sales_Fabric workspace select Sales_Bronze lakehouse
  • Create in Bronze_Sales file section the subfolder raw
  • Chose raw subfolder, click right , New shortcut and follow the steps from 1 to 8.
  • With the new shortcut Sales created we are able to access the files located in landing zone now from inside Sales_Bronze lakehouse.

Data movement and transformation to silver Delta table

Now that we have data in the bronze layer of our lakehouse, we can use notebooks to transform the data and load it to a delta table in the silver layer. There are 8 source files in Sales_Bronze, we created a notebook per file , the language used is PySpark, for the names is good to use a naming convention in our case we use the prefixe nb_ , all notebooks names are following this standard nb_TableName_SourceLayer_TargetLayer , for Customer table the notebook name will be nb_Customer_Bronze_Silver, because of that the notebooks will be very easy to find being grouped together.

Data in the Silver layer will be cleaned, conformed and normalized, the tables are in (3NF)Third Normal Form

  1. From fabric workspace we will open the Sales_Silver lakehouse.

2. On the Home page while viewing the contents of the Sales_Silver lakehouse, in the Open notebook menu, select New notebook.

After a few seconds, a new notebook containing a single cell will open. Notebooks are made up of one or more cells that can contain code or markdown (formatted text).

3. When the notebook opens, rename it to nb_Customer_Bronze_Silver by selecting the Notebook xxxx text at the top left of the notebook and entering the new name.

4. Paste the following code into the cell:

# Notebook nb_Customers_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *

# define the schema for Customers table

table_name = 'Customers'
Customers_schema=StructType([
StructField('CustomerID', IntegerType(), True),
StructField('FirstName', StringType(), True),
StructField('LastName', StringType(), True),
StructField('FullName', StringType(), True),
StructField('DateInserted', StringType(), True)])

df = spark.read.format("csv").schema(Customers_schema).option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/customers.csv")
# the new column SourceFilename, AuditId, DateInserted will contain the timestamp when the record was inserted
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())

df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/"+table_name)

5.Use the (Run cell) button on the left of the cell to run the code.

Note: Since this is the first time you’ve run any Spark code in this notebook, a Spark session must be started.

The steps 2, 3, 4 and 5 are repetitive for rest of the tables, the code is provided .

# nb_Employees_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/employees.csv")
df = df.withColumn("EmployeeID", df["EmployeeID"].cast("integer"))
df = df.withColumn("ManagerID", df["ManagerID"].cast("integer"))
df = df.withColumn("FirstName", df["FirstName"].cast("string"))
df = df.withColumn("LastName", df["LastName"].cast("string"))
df = df.withColumn("FullName", df["FullName"].cast("string"))
df = df.withColumn("JobTitle", df["JobTitle"].cast("string"))
df = df.withColumn("OrganizationLevel", df["OrganizationLevel"].cast("integer"))
df = df.withColumn("MaritalStatus", df["MaritalStatus"].cast("string"))
df = df.withColumn("Gender", df["Gender"].cast("string"))
df = df.withColumn("Territory", df["Territory"].cast("string"))
df = df.withColumn("Country", df["Country"].cast("string"))
df = df.withColumn("Group", df["Group"].cast("string"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Employees")
# nb_Orders_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/orders.csv")
df = df.withColumn("SalesOrderID", df["SalesOrderID"].cast("integer"))
df = df.withColumn("SalesOrderDetailID", df["SalesOrderDetailID"].cast("integer"))
df = df.withColumn('OrderDate', to_date(col('OrderDate'), 'M/d/yyyy').alias('OrderDate').cast('date'))
df = df.withColumn('DueDate', to_date(col('DueDate'), 'M/d/yyyy').alias('DueDate').cast('date'))
df = df.withColumn('ShipDate', to_date(col('ShipDate'), 'M/d/yyyy').alias('ShipDate').cast('date'))
df = df.withColumn("EmployeeID", df["EmployeeID"].cast("integer"))
df = df.withColumn("CustomerID", df["CustomerID"].cast("integer"))
df = df.withColumn("SubTotal", df["SubTotal"].cast("double"))
df = df.withColumn("TaxAmt", df["TaxAmt"].cast("double"))
df = df.withColumn("Freight", df["Freight"].cast("double"))
df = df.withColumn("TotalDue", df["TotalDue"].cast("double"))
df = df.withColumn("ProductID", df["ProductID"].cast("integer"))
df = df.withColumn("OrderQty", df["OrderQty"].cast("double"))
df = df.withColumn("UnitPrice", df["UnitPrice"].cast("double"))
df = df.withColumn("UnitPriceDiscount", df["UnitPriceDiscount"].cast("double"))
df = df.withColumn("LineTotal", df["LineTotal"].cast("double"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df = df.withColumn("AuditId",lit(parAuditId).cast("integer"))
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Orders")
# nb_Product_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/products.csv")
df = df.withColumn("ProductID", df["ProductID"].cast("integer"))
df = df.withColumn("ProductNumber", df["ProductNumber"].cast("string"))
df = df.withColumn("ProductName", df["ProductName"].cast("string"))
df = df.withColumn("ModelName", df["ModelName"].cast("string"))
df = df.withColumn("MakeFlag", df["MakeFlag"].cast("smallint"))
df = df.withColumn("StandardCost", df["StandardCost"].cast("double"))
df = df.withColumn("ListPrice", df["ListPrice"].cast("double"))
df = df.withColumn("SubCategoryID", df["SubCategoryID"].cast("smallint"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Products")
# nb_ProductCategories_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/productcategories.csv")
df = df.withColumn("CategoryID", df["CategoryID"].cast("integer"))
df = df.withColumn("CategoryName", df["CategoryName"].cast("string"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/ProductCategories")
# nb_ProductSubCategories_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/productsubcategories.csv")
df = df.withColumn("SubCategoryID", df["SubCategoryID"].cast("integer"))
df = df.withColumn("CategoryID", df["CategoryID"].cast("integer"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/ProductSubCategories")
# nb_VendorProduct_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/vendorproduct.csv")
df = df.withColumn("VendorID", df["ProductID"].cast("integer"))
df = df.withColumn("VendorName", df["VendorID"].cast("integer"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/VendorProduct")
# nb_Vendors_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Fabric@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/raw/Sales/vendors.csv")
df = df.withColumn("VendorID", df["VendorID"].cast("integer"))
df = df.withColumn("VendorName", df["VendorName"].cast("string"))
df = df.withColumn("AccountNumber", df["AccountNumber"].cast("string"))
df = df.withColumn("CreditRating", df["CreditRating"].cast("integer"))
df = df.withColumn("ActiveFlag", df["ActiveFlag"].cast("integer"))
df = df.withColumn("DateInserted", current_timestamp())
df = df.withColumn("SourceFilename",input_file_name())
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Vendors")

We now have in Sales_Silver the delta tables that are ready for further transformation and modeling.

Explore data in the silver layer using the SQL endpoint

Now that we have data in your silver layer, we can use the SQL endpoint to explore the data and perform some basic analysis. This is a nice option for you if you’re familiar with SQL and want to do some basic exploration of your data. In this demo we’re using the SQL endpoint view in Fabric, but note that you can also use other tools like SQL Server Management Studio (SSMS) and Azure Data Explorer.

  1. Navigate back to your workspace and notice that you now have a few assets listed. Select SQL endpoint to open your lakehouse in the SQL endpoint view.

2. Select New SQL query from the ribbon, which will open a SQL query editor. Note that you can rename your query using the menu item next to the existing query name in the lakehouse explorer pane.

3. Paste the following query into the query editor and select Run:

 SELECT YEAR(OrderDate) AS Year
, CAST (SUM(OrderQty * (UnitPrice + TaxAmt)) AS DECIMAL(12, 2)) AS TotalSales
FROM Orders
GROUP BY YEAR(OrderDate)
ORDER BY YEAR(OrderDate)

This query calculates the total sales for each year in the Orders table. Your results should look like this:

Data exploration at the silver layer is useful for basic analysis, but you’ll need to transform the data further and model it into a star schema to enable more advanced analysis and reporting. You’ll do that in the next section.

Transform data for gold layer

We have successfully taken data from your bronze layer, transformed it, and loaded it into a silver Delta table. Now you’ll use notebooks to transform the data further, model it into a star schema, and load it into gold Delta tables

Gold layer data will be denormalized (star model) and ready for consumption.

  1. From fabric workspace we will open the Sales_Gold lakehouse.

2. On the Home page while viewing the contents of the Sales_Gold lakehouse, in the Open notebook menu, select New notebook.

3. When the notebook opens, rename it to nb_DimCustomer_Silver_Gold by selecting the Notebook xxxx text at the top left of the notebook and entering the new name.

4. Paste the following code into the cell:

df = spark.sql("SELECT CustomerID as CustomerKey,FirstName,LastName ,FullName ,DateInserted FROM Sales_Silver.Customers")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/DimCustomer")

5.Use the (Run cell) button on the left of the cell to run the code.

The steps 2, 3, 4 and 5 are repetitive for rest of the tables( the same situation in Sales_Silver creation steps), the code for gold is provided .

Note: The notebooks for gold layer have 2 section ( Spark Sql and PySpark)

%%sql
--nb_Employee_Silver_Gold
CREATE OR REPLACE TEMPORARY VIEW tmpEmployee
AS
SELECT EmployeeID EmployeeKey
,ManagerID
,FirstName
,LastName
,FullName
,JobTitle
,OrganizationLevel
,MaritalStatus
,Gender
,Territory
,Country
,Group
FROM Sales_Silver.Employees
df = spark.sql("SELECT * FROM tmpEmployee")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/DimEmployee")
%%sql
--nb_DimVendor_Silver_Gold
CREATE OR REPLACE TEMPORARY VIEW tmpVendors
AS
SELECT VendorID as VendorKey
,VendorName
,AccountNumber
,CreditRating
,ActiveFlag
,DateInserted
FROM Sales_Silver.Vendors
df = spark.sql("SELECT * FROM tmpVendors")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/DimVendor")
%%sql
--nb_DimProductSubCategories_Silver_Gold
CREATE OR REPLACE TEMPORARY VIEW tmpProductSubCategory
AS
SELECT SubCategoryID as SubCategoryKey
,CategoryID as CategoryKey
,SubCategoryName
FROM Sales_Silver.ProductSubCategories
df = spark.sql("SELECT * FROM tmpProductSubCategory")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/DimProductSubCategory")
%%sql
--nb_DimProductCategorie_Silver_Gold
CREATE OR REPLACE TEMPORARY VIEW tmpProductCategory
AS
SELECT CategoryID as CategoryKey
,CategoryName
,DateInserted
FROM Sales_Silver.ProductCategories
df = spark.sql("SELECT * FROM tmpProductCategory")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/DimProductCategory")
%%sql
--nb_DimProduct_Silver_Gold
CREATE OR REPLACE TEMPORARY VIEW tmpProducts
AS
SELECT ProductID as ProductKey
,ProductNumber
,ProductName
,ModelName
,MakeFlag
,StandardCost
,ListPrice
,SubCategoryID
,DateInserted
FROM Sales_Silver.Products
df = spark.sql("SELECT * FROM tmpProducts")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/DimProduct")
%%sql
--nb_FactSales_Silver_Gold
CREATE OR REPLACE TEMPORARY VIEW tmpSales
AS
SELECT SalesOrderID AS SalesOrderKey
,ifnull(e.EmployeeKey,-1) as EmployeeKey
,ifnull(c.CustomerKey,-1) as CustomerKey
,ifnull(p.ProductKey,-1) as ProductKey
,ifnull(vp.VendorID,-1) as VendorKey
,ifnull(psc.CategoryKey,-1) as CategoryKey
,ifnull(psc.SubCategoryKey,-1) as SubCategoryKey
,SalesOrderDetailID
,OrderDate
,DueDate
,ShipDate
,SubTotal
,TaxAmt
,Freight
,TotalDue
,OrderQty
,UnitPrice
,UnitPriceDiscount
,LineTotal
,o.DateInserted
FROM Sales_Silver.Orders o
LEFT JOIN Sales_Gold.DimCustomer c ON c.CustomerKey =o.CustomerID
LEFT JOIN Sales_Gold.DimEmployee e ON e.EmployeeKey= o.EmployeeID
LEFT JOIN Sales_Gold.DimProduct p ON p.ProductKey = o.ProductID
LEFT JOIN
(
SELECT
ProductID,
VendorID
from
Sales_Silver.VendorProduct
group by
ProductID,
VendorID
) vp ON vp.ProductID = p.ProductKey
LEFT JOIN Sales_Gold.DimProductSubCategory psc ON psc.SubCategoryKey=p.SubCategoryID
df = spark.sql("SELECT * FROM tmpSales")
df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/FactSales")

We now have a curated, modeled gold layer that can be used for reporting and analysis.

Semantic model

Power BI semantic models represent a source of data ready for reporting, visualization, discovery, and consumption. When we create a SQL analytics endpoint, a default Power BI semantic model is created. The default semantic model is represented with the (default) suffix.

Note that we can’t use the default semantic model that is automatically created when we create a lakehouse. We must create a new semantic model that includes the gold tables we created in this exercise, from the lakehouse explorer.

  1. In our workspace, navigate to the Sales_Gold lakehouse.
  2. Select New semantic model from the ribbon of the lakehouse explorer view.
  3. Assign the name Sales_Gold to the new semantic model.
  4. Select gold tables to be include in semantic model and select Confirm

This will open the semantic model in Fabric, as we can see the relations between the tables were automatically added , but we are able to create relationships and measures.

The reporting team can create reports and dashboards using the new model based on the data from Sales_Gold lakehouse, the reports will reflect the latest data.

If you liked this post, please show your support by 👏 for this story and follow my !

The source files can be downloaded from this location : https://github.com/valentinlog/adventureworksdataset

--

--