Metadata-driven ETL Framework for Azure Synapse Analytics

Robert Caspari
7 min readMay 23, 2023

--

Using Metadata to create and manage lakehouse database objects in Azure Synapse Analytics.

In this short article, I will provide an approach of how one can use metadata to not only create and update tables for the lake database within Azure Synapse Analytics, but also how to load source data into them. I am going to use Synapse Native Spark Notebooks, Scripts and Pipelines to achieve this. But first, why would you do this in the first place?

Metadata — what is it good for?

Introduction

Often we are confronted with ever-changing schemas of our source systems and evolving business requirements. The following reflects my personal opinions, but I hope, that these can serve as a basis for discussion. The traditional way of handling such new demands is to discuss them with your team and the business side. After that, you would start your work and manually edit your pipeline and include or exclude new columns for example, or to load an entirely new table. In my experience, it can take a few iterations between discussion and development before you reach your “final” version. This can be due to a few reasons, but manually editing individual pipelines can be one of them. Manually editing pipelines and creating new tables is not only prone to errors but also creates pipelines that are very different to each other, even when in theory they accomplish similar things. This can also make it hard to explain the sometimes very nested and individual processes to managers or new hires.

A metadata and parametrized approach helps to address this issue. While in my opinion, it is not the “cure of all” problems it certainly helps to keep your pipelines clean and your data warehouse (lakehouse, …) concept sound and professional.

Structure and contents

  • Identifying and describing your challenge
  • Designing your Metadata Tables
  • Create and Update Tables dynamically using Spark Notebooks
  • Parametrize Synapse Pipelines
  • Conclusion and next steps

Identifying and describing your challenge

Before diving into the technical nitty-gritty one should first think as broadly as possible about what the actual challenge is. Let's assume the following business requirement: “We need the Account Dimension for reporting and analysis”. Our task as data engineers and analysts now is to be proactive and to clarify, whether the goal is to load this new source or new table into your existing data model, or rather how to handle new sources and tables in general. Therefore maybe it’s not how to load a new ‘Account’ Dimension into your data model, but rather how to handle new and changing dimensions in a general and uniform approach that requires only minimal manual editing and is highly reusable. Such an approach probably takes longer to develop at first but could pay off in the future because requests can be handled much quicker. In our example, we decide on developing a general approach that lets us load new tables and columns by specifying only a few parameters in just one metadata table.

Designing your Metadata Tables

To keep this article simple I will present only one table for creation and specifying tables. In a production scenario, you would want additional metadata tables to describe your pipelines and for keeping logs of activities and changes in general as well. (Table definition from [1])

Meta Layer — Table and Pipeline characteristics

While a lakehouse architecture is often structured in a medallion architecture (think bronze -> silver -> gold, with the highest aggregation and quality in the gold serving layer), an additional meta-layer is a worthwhile consideration too.

Create and Update Tables dynamically using Spark Notebooks

Using Synapse Native Spark notebooks we can first create our databases or schemas, as both terms are used interchangeably within Spark. This could done in an initialising notebook, that you only run once or when recreating objects.

# Set up all databases / schemas (wording used interchangable on spark)
spark.sql("""CREATE DATABASE IF NOT EXISTS dc_stage
LOCATION 'abfss://stage@storageaccount.dfs.core.windows.net/'
""")
spark.sql("""CREATE DATABASE IF NOT EXISTS bronze
LOCATION 'abfss://bronze@storageaccount.dfs.core.windows.net/'
""")
spark.sql("""CREATE DATABASE IF NOT EXISTS silver
LOCATION 'abfss://silver@storageaccount.dfs.core.windows.net/'
""")
spark.sql("""CREATE DATABASE IF NOT EXISTS gold
LOCATION 'abfss://gold@storageaccount.dfs.core.windows.net/'
""")
spark.sql("""CREATE DATABASE IF NOT EXISTS meta
LOCATION 'abfss://meta@storageaccount.dfs.core.windows.net/'
""")

Afterwards, while certainly overkill, we can use spark to read our meta table describing our tables, columns and data types from a .csv from an ADLS2 Storage. I chose to use a .csv for this since it’s easy to edit (e.g. add columns) and the schema is clear and unlikely to change often. The created table is a delta table, here you could choose to opt for a regular .parquet table instead. The delta format however could be used to recreate older versions of our actual tables through time travel features. Since most of my columns are strings in the data table I chose only to specify data types for columns that are not strings and to use string as a default (this is why I used coalesce) here.

# Create / fill t_db_objects_meta
deltaPath = 'abfss://meta@storageaccount.dfs.core.windows.net/t_db_objects_meta'
db_objects_schema = StructType([StructField('db_object_source',StringType(),False), StructField('db_object_name',StringType(),False), StructField('db_object_attribute',StringType(),False), StructField('db_object_type',StringType(),False)])

db_objects_from_file = spark.read.load('abfss://meta@sahrdataclouddev.dfs.core.windows.net/src_db_objects_meta/db_objects_meta.csv', format='csv'
,header=True
,schema=db_objects_schema
)

db_objects_from_file \
.withColumn('db_object_hash', F.md5(F.concat_ws('_', 'db_object_source', 'db_object_name', 'db_object_attribute')).cast(StringType())) \
.withColumn('db_object_type', F.coalesce(F.col("db_object_type"),F.lit("STRING").cast(StringType()))) \
.write \
.format("delta") \
.save(deltaPath)

spark.sql("""CREATE TABLE IF NOT EXISTS dc_meta.t_db_objects_meta
USING DELTA LOCATION 'abfss://meta@sahrdataclouddev.dfs.core.windows.net/t_db_objects_meta'""")

The following format creates our target tables as Parquet. While I would prefer to use the Delta format that sits on top of parquet, Synapse Notebook does not allow us to specify the actual schema yet (it might be possible soon through the deltaTableBuilder API that’s in preview [2]).

# Prepare creation / updating of other tables based on t_db_objects_meta
# includes db_object_source name (check if you actually want / need this)
def create_table_stmt(db_object_source, table_name, layer_name):

# using sql to dynamically bulid our sql creation query / command as string
create_table_stmt = (
spark.sql("""
SELECT concat_ws(''
,'CREATE TABLE IF NOT EXISTS '
,'{0}'
,'.'
,'{1}'
,'_'
,'{3}'
,' ('
,array_join(collect_list(db_objects_column_definition),", ")
,') USING PARQUET'
) as create_table_stmt
FROM (
SELECT concat_ws(' '
,db_object_attribute
,db_object_type
) AS db_objects_column_definition
, db_object_name
FROM meta.t_db_objects_meta
WHERE db_object_name = '{1}'
)
""".format(layer_name,table_name,db_object_source)).first()["create_table_stmt"]
)
spark.sql(create_table_stmt)

For updating these tables a similar function can be created. After we defined our table creation function we loop over it using our previously created `t_db_objects_meta` table for input values.

tableAttributes= spark.sql("""SELECT DISTINCT db_object_source, db_object_name FROM dc_meta.t_db_objects_meta""").collect()

# looping through tables
for item in tableAttributes:
# create or update table
create_table_stmt(item['db_object_source'], item['db_object_name'], "dc_bronze", "t")
# update_table(row['db_objects_name'])

This can create all of the tables we have defined across all layers (bronze, silver, gold) and ensures correct data types for later conversions.

Parametrize Synapse Pipelines

A pipeline that utilizes such a metadata framework could look like this. This pipeline uses two lookups (through a script accessing our synapse serverless endpoint on the lake database). The first lookup returns the tables according to the actual source that we want to query in that pipeline (one pipeline for Salesforce as the source in this case). The second lookup checks the already existing Bronze tables for the last load date (which enables us to do delta load).

Synapse Pipeline — Integrating a metadata driven approach

Such a pipeline template creates a reusable pattern, where you create only as many pipelines as necessary. The pipeline above for example queries all data specified in the csv in the beginning and loads it with the correct types into the bronze layer. This reduces the overhead required to fix errors across individual source tables. In my opinion, one pipeline per source system or source type can be a good rule of thumb. Everything else one can do through parametrizing the pipeline and specifying individual pipeline parameters in another pipelines meta table, that lists each instance for pipelines with a struct for parameters like “parameter1”:”value2".

Conclusion and next steps

This article introduced a metadata and parametrized approach for creating and updating tables for the lake database within Azure Synapse Analytics. I hope I was able to explain how this approach can help to keep pipelines clean and structure the data warehouse. We also discussed the process of identifying and describing a challenge, designing metadata tables accordingly, and creating and updating them dynamically using Spark Notebooks. By specifying only a few parameters in just one metadata table, it becomes possible to load new tables and columns in a uniform approach that requires only minimal manual editing and is highly reusable. Overall, this approach can save development time and help handle new and changing dimensions in a general and uniform way.

The next step for completing the metadata framework in Synapse is to create and fill the other meta tables. Further parametrizing pipelines, log runs and document systems makes our data warehouse even more adaptable butthe structure remains stable.

--

--