Config-Driven Data Standardization Framework using Spark

Pallavi Sinha
8 min readJul 5, 2024

--

In today’s data-driven world, organisations collect vast amounts of raw data from various sources. This data is often inconsistent, poorly structured, and requires significant preprocessing before it can be used for analysis and decision-making. To address these challenges, a robust data standardization process is essential.
In this blog, first, we will understand the importance of data standardization in brief, then the advantages of the config-driven approach, and finally step-by-step development of this framework. We will also see how this framework can be extended to your use case and can be useful.

Data Standardization

Data standardization is the process of transforming data from diverse sources into a common format. This involves aligning naming conventions, data types, adding metadata, etc to ensure uniformity across data products.

Here are the key reasons why standardizing raw data is important:

  • Consistency Across Data Products: Standardization ensures that data from different sources conform to the same structure, making it easier to integrate and analyze.
  • Reduced Errors: By applying consistent naming conventions, correct data types, and transformations, the chances of errors during data processing and analysis are significantly reduced.
  • Enhanced Data Usability: Standardized data is more accessible and usable for various analytical and reporting purposes, leading to better insights and business decisions.
  • Scalability: As the volume of data grows, standardized data products are easier to manage, scale, and extend with new data sources and columns.

Introduction to Config-Driven Data Standardization Framework

Our aim is to build a generic framework rather than creating custom ingestion and transformation scripts for each data product. This config-driven approach to data standardization uses configuration files to define rules and mappings, decoupling the standardization logic from the application code. This enhances the system’s flexibility and maintainability.

For this tutorial, the configuration file format will be JSON and the code will be in PySpark and Spark SQL. The data products will be in delta format.

The major pros of the config-driven approach :

  • Flexible: Easily modify standardization rules without changing the application code.
  • Scalability: Handles large datasets efficiently using Spark.
  • Maintainability: Centralizes transformation logic in a configuration file, simplifying maintenance and updates.

The capabilities that we will be building :

  1. Column Names update
  2. Data Transformations
  3. Column Data Type update
  4. Column Description metadata update
  5. New columns addition

Before starting, I want to mention that Data standardization can encompass a wide range of activities beyond what we are covering here. This may include adding data quality validations, incorporating additional metadata, data product versioning, and other processes that enhance data integrity and usability.

Setting up!

We will be using the Databricks community edition for writing the code and for data storage Databricks File System (DBFS) will be used which is a distributed file system that is integrated into the Databricks platform. But feel free to save your files elsewhere like S3 buckets, or ADLS, as the framework will be accepting the path, so it’s quite generic.

Here is the Config file structure explained, which we will be using in this demo —
Config File Structure

data_product_name: <Name to assign to the DP after standardization>
raw_data_product_name: <source raw data product name>
schema:
source_columns: (columns coming directly from raw data product)
raw_name: <column name in raw data product>
standardized_name: <standardized name for the raw column>
data_type: <The data type name we want to cast the column data to>
sql_transformation: <The transformation rule that is written in Spark SQL>
new_columns: (columns obtained by performing a join with other DPs)
name: <Name of the new column to be created>
data_type: <The data type name we want to cast the column data to>
sql_transformation: <The transformation rule that is written in Spark SQL>
metadata: (Metadata to be assigned after all the columns added)
column_descriptions:
<column_name>: <description>

Here is the raw data product — supplier that we will be standardizing :

supplier (Raw data product)

Below is another standardized data product — Product that we will be using to bring new column.

Product (Other Standardized Data Product)

The JSON config file of supplier based on the structure discussed above :

{
"data_product_name" : "Product_Supplier",
"raw_data_product_name" : "supplier",
"schema" : {
"source_columns" : [
{
"raw_name" : "sup_id",
"standardized_name" : "Supplier_ID",
"data_type" : "string",
"sql_transformation" : "CONCAT('SUP', '-' , sup_id)"
},
{
"raw_name" : "name",
"standardized_name" : "Supplier_Name",
"data_type" : "string",
"sql_transformation" : ""
},
{
"raw_name" : "price",
"standardized_name" : "Purchase_Price",
"data_type" : "int",
"sql_transformation" : ""
},
{
"raw_name" : "prod_name",
"standardized_name" : "Product_Name",
"data_type" : "string",
"sql_transformation" : ""
},
{
"raw_name" : "quantity",
"standardized_name" : "Purchase_Quantity",
"data_type" : "int",
"sql_transformation" : ""
},
{
"raw_name" : "",
"standardized_name" : "Total_Cost",
"data_type" : "int",
"sql_transformation" : "price * quantity"
}

],

"new_columns" : [
{
"name" : "Product_ID",
"data_type" : "string",
"sql_transformation" : "MERGE INTO delta.`{temp_std_dp_path}` dest USING delta.`dbfs:/FileStore/project/Product` src ON dest.Product_Name = src.Product_Name WHEN MATCHED THEN UPDATE SET dest.Product_ID = src.Product_ID"
}
]
},

"column_sequence_order" : [
"Supplier_ID", "Supplier_Name", "Product_ID", "Product_Name", "Purchase_Price", "Purchase_Quantity", "Total_Cost"
],

"metadata" : {

"column_descriptions" : {

"Supplier_ID" : "Unique identifier for the supplier of a product",
"Supplier_Name" : "Name of the supplier",
"Purchase_Price" : "Price at which the supplier sells the product",
"Product_Name" : "Name of the product",
"Purchase_Quantity" : "Quantity of the product available with the supplier",
"Total_Cost" : "Total amount spent on purchasing a specific quantity of items at the given purchase price.",
"Product_ID" : "Unique identifier for the product"
}
}
}

Write the supplier raw DP and Product DP in delta format in your chosen location. Also upload the JSON config file in the desired path.

We will be following the full load process (truncate-load). Therefore, all the steps involved will be performed in the temporary/staging area, and then overwritten to the actual standardized DP path.

Developing the Framework —

First, we will define interface or contract of the config reader so that we can create our own config reader class if we have different structure. For example, we have our config in JSON here, but if we want to have YAML config, we can simply create our config reader by extending the following abstract class —

class ConfigReaderContract(ABC):

@abstractmethod
def read_source_columns_schema(self) -> spark.DataFrame:
pass
@abstractmethod
def read_new_columns_schema(self) -> spark.DataFrame:
pass
@abstractmethod
def read_column_descriptions_metadata(self) -> dict:
pass
@abstractmethod
def read_column_sequence_order(self) -> list[str]:
pass

For our case below is the implementation of the ConfigReader class —

class ConfigReader(ConfigReaderContract):

def __init__(self, config_path):
self.config_df = spark.read.option("multiLine", True).json(config_path)

def read_source_columns_schema(self):
exploded_df = self.config_df.select(explode(self.config_df["schema"].source_columns).alias("source_columns"))
source_columns_schema_df = exploded_df.selectExpr(
"source_columns.raw_name as raw_name",
"source_columns.standardized_name as standardized_name",
"source_columns.data_type as data_type",
"source_columns.sql_transformation as sql_transformation"
)
return source_columns_schema_df

def read_new_columns_schema(self):

exploded_df = self.config_df.select(explode(self.config_df["schema"].new_columns).alias("new_columns"))
new_columns_schema_df = exploded_df.selectExpr(
"new_columns.name as name",
"new_columns.data_type as data_type",
"new_columns.sql_transformation as sql_transformation"
)
return new_columns_schema_df

def read_column_descriptions_metadata(self):
metadata_df = self.config_df.select("metadata.column_descriptions").alias("column_descriptions")
descriptions_row_obj = metadata_df.first()["column_descriptions"]
return descriptions_row_obj.asDict()

def read_column_sequence_order(self):
return list(self.config_df.first()["column_sequence_order"])

Now finally, lets see the implementation of our DataStandardizer class. Then we will step by step understand each method.

class DataStandardizer:

def __init__(self, raw_dp_path, temp_std_dp_path, std_dp_path):
self.raw_dp_path = raw_dp_path
self.temp_std_dp_path = temp_std_dp_path
self.std_dp_path = std_dp_path

def create_temp_std_dp_with_source_columns(self, source_columns_schema_df):
source_columns_schema_df.createOrReplaceTempView("source_columns_config_table")
select_query_sql = f"""
SELECT
concat(
"SELECT ",
array_join(collect_list(select_expression), ", "),
" FROM delta.`{self.raw_dp_path}`"
) as select_query
FROM (
SELECT
CASE
WHEN sql_transformation = "" THEN concat("CAST(", concat("`", raw_name, "`"), " AS ", data_type, ") AS ", standardized_name)
ELSE concat("CAST(", sql_transformation, " AS ", data_type, ") AS ", standardized_name)
END as select_expression
FROM source_columns_config_table
)
"""
df = spark.sql(select_query_sql)
select_query = df.first()["select_query"]
create_sql_query = f"CREATE OR REPLACE TABLE delta.`{self.temp_std_dp_path}` as ( " + select_query + ")"
spark.sql(create_sql_query)

def add_new_columns_in_temp_std_dp(self, new_columns_schema_df):
new_columns_schema_df_rows = new_columns_schema_df.collect()
for row in new_columns_schema_df_rows:
add_new_columns_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` ADD COLUMN {row['name']} {row['data_type']}"
sql_transformation = row["sql_transformation"].replace("{temp_std_dp_path}", self.temp_std_dp_path)
spark.sql(add_new_columns_sql)
spark.sql(sql_transformation)

def update_column_descriptions_metadata(self, column_descriptions_dict):
for column_name,description in column_descriptions_dict.items():
column_description_update_sql = f"ALTER TABLE delta.`{self.temp_std_dp_path}` CHANGE COLUMN {column_name} COMMENT '{description}';"
spark.sql(column_description_update_sql)

def move_data_to_std_dp(self, column_sequence_order):
temp_std_df = spark.read.format("delta").load(self.temp_std_dp_path)
temp_std_df = temp_std_df.select(column_sequence_order)
temp_std_df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(self.std_dp_path)

def run(self, config_reader):
print("Raw df : ")
raw_df = spark.read.format("delta").load(self.raw_dp_path)
display(raw_df)

source_columns_schema_df = config_reader.read_source_columns_schema()
self.create_temp_std_dp_with_source_columns(source_columns_schema_df)

new_columns_schema_df = config_reader.read_new_columns_schema()
self.add_new_columns_in_temp_std_dp(new_columns_schema_df)

column_descriptions_dict = config_reader.read_column_descriptions_metadata()
self.update_column_descriptions_metadata(column_descriptions_dict)

column_sequence_order = config_reader.read_column_sequence_order()
self.move_data_to_std_dp(column_sequence_order)

print("Standardized df : ")
std_df = spark.read.format("delta").load(self.std_dp_path)
display(std_df)

print("Schema information for Standardized df : ")
std_df.printSchema()
display(spark.sql(f"DESCRIBE TABLE delta.`{self.std_dp_path}`"))

DataStandardizer class takes 3 attributes which is raw_dp_path (The path of raw data product), the temp_std_dp_path (The path which act as staging area for standardization) and the std_dp_path (The path for saving the standardized dp). The methods are described below —

1. create_temp_std_dp_with_source_columns— This method creates initial version of standardized DP with source columns (columns directly coming from source raw data product).
2. add_new_columns_in_temp_std_dp — This method updates the temp standardized DP with new columns (columns obtained by performing a join with other DPs).
3. update_column_descriptions_metadata — This method updates the descriptions to each column created.
4. move_data_to_std_dp — This is the final step to copy the delta table from temp/staging area to standardized data product path.
5. run — This method orchestrates all the steps above. It accepts config_reader which is an instance of class implemented using ConfigReaderContract class.

Run the code to get standardized DP —

Below is the code for running the framework on the raw data product — supplier shown above.

# Define all the paths accoridng to your use case
raw_dp_path = "dbfs:/FileStore/project/supplier"
std_dp_path = "dbfs:/FileStore/project/Product_Supplier"
temp_std_dp_path = "dbfs:/FileStore/project/Product_Supplier_temp"
config_path = "dbfs:/FileStore/project/supplier_config.json"

# Initialize config reader and data standardizer
config_reader = ConfigReader(config_path)
data_standardizer = DataStandardizer(
raw_dp_path=raw_dp_path,
temp_std_dp_path=temp_std_dp_path,
std_dp_path=std_dp_path
)

# Invoke the DataStandardizer class
data_standardizer.run(config_reader)

The output is displayed below —

In the above images we can see that the Standardized DP is created with all the columns mentioned in config along with the description for each.

For example, sup_id column name in raw DP is renamed to Supplier_ID along with the required prefix (SUP) before each value. The description metadata -Unique Identifier of the supplier of a product is also updated.
Column
Total_Cost is also added by multiplying price and quantity from raw DP. New column — Product_ID is added by performing join with other data product called Product .

The code used in this blog can be found in this Github repo.

Conclusion :

The Config-Driven Data Standardization Framework utilizing Spark presents a robust and scalable approach for transforming raw data into standardized, high-quality data products. By leveraging configuration files to dictate standardization rules, this framework offers flexibility, consistency, and ease of maintenance, allowing for dynamic adjustments without altering the core codebase. Its extensibility means it can be easily adapted to include additional functionalities such as data quality validations , data product versioning and other metadata enhancements, further broadening its applicability.

I hope you enjoyed this blog. Feel free to ask questions and add your feedback in the comments, which will help me improve.

--

--