Leveraging Python’s OOP in Medallion Architecture for Enhanced Data Pipelines
When building pipelines, for instance, in Azure Databricks, we often focus on processing the data rather than constructing a maintainable code. In scenarios where requirements suddenly change, we struggle to modify the code extensively.
In this article, I will address this issue by showing how we can incorporate Object-Oriented Programming (OOP) concepts from the very beginning of building our pipeline, specifically within the Medallion architecture. This will help us in making our code more scalable and adaptable.
What is the Medallion architecture?
Medallion architecture is a smart way to organize data in a data pipeline, and it’s split into three main layers: Bronze, Silver, and Gold.
- Bronze Layer: This is where all raw data (unprocessed and in its original format) are initially collected.
- Silver Layer: Here, the data are cleaned and processed. It involves correcting errors and formatting the data for better usability.
- Gold Layer: Here, the data are refined for specific business uses. It involves summarizing and enriching the data to provide actionable insights.
Having understood the layers of Medallion architecture, I’ll focus on leveraging object-oriented programming (OOP) principles to build and manage this pipeline effectively.
Streamlining Data Sources: OOP Strategies and Bronze Layer Integration
In this section, we’ll see how Object-Oriented Programming (OOP) simplifies managing source data. I’ll discuss abstract classes, the factory pattern for creating data objects, and the Bronze layer for storage. This strategy promotes organized, scalable data handling, illustrated through a code demonstration.
Source Data
For this project, I’ve created a set of sample source data. This data has been stored in Azure Data Lake Storage (ADLS). Here’s a sample of the data:
Using Abstract Classes
I’ve implemented the use of “Abstract Classes” to handle different data sources. An abstract class is like a blueprint; it outlines methods but doesn’t provide any functionality. It’s up to the subclasses to fill in the details.
Here’s a quick look at how I used these concepts:
- Abstract Class —
DataSource
: I created an abstract class namedDataSource
. This class has two methodsread_data
andsave_data
, but they don’t have any actual code in them. Think of these as guidelines or rules that any data source must follow. - Inheritance: I then created two classes:
ADLSDataSource
andSQLServerDataSource
both inheriting fromDataSource
. This is like saying,ADLSDataSource
andSQLServerDataSource
are types ofDataSource
. - Method Overriding: In both
ADLSDataSource
andSQLServerDataSource
, I provided specific details on how to read and save data. This is called method overriding. It’s like filling in the blanks left by the abstract class. For reading data,ADLSDataSource
utilizes a file path to access data in parquet format, whileSQLServerDataSource
establishes a connection to a SQL Server database. Regarding saving, both classes write data back in the parquet format.
from abc import ABC, abstractmethod
import json
# Abstract base class for data sources
class DataSource(ABC):
@abstractmethod
def read_data(self, **params):
pass
@abstractmethod
def save_data(self, dataframe, **params):
pass
# ADLS Data Source
class ADLSDataSource(DataSource):
def read_data(self, **params):
path = params.get('path')
return spark.read.parquet(path,header = True)
def save_data(self, dataframe, **params):
path = params.get('path')
dataframe.write.format("parquet").mode("Overwrite").option("header",True).save(path)
#SQL Server Data Source
class SQLServerDataSource(DataSource):
def read_data(self, **params):
return (spark.read
.format("sqlserver")
.option("host", params.get('host'))
.option("port", params.get('port', '1433'))
.option("user", params.get('user'))
.option("password", params.get('password'))
.option("database", params.get('database'))
.option("dbtable", params.get('dbtable'))
.load())
def save_data(self, dataframe, **params):
path = params.get('path')
dataframe.write.format("parquet").mode("overwrite").option("header",True).save(path)
The benefits of this approach are organization and consistency. By using abstract classes and inheritance, I ensure that all data sources I might create in the future will follow the same basic rules. It makes the code easier to manage and understand.
Using the Factory Pattern for Data Source Creation
Here, I’ve employed a “factory pattern” design pattern to create data source objects. The factory pattern is a way to create objects without specifying the exact class of objects that will be created.
Here’s how I used the factory pattern:
- I created a class named
DataSourceFactory
. This factory has a methodget_data_source
which takes a parameter calledsource_type
. Based on thissource_type
, the factory decides which kind of data source object to create. - For example, if
source_type
is "adls", the factory returns an object ofADLSDataSource
. Similarly, ifsource_type
is “sql_server” an instance ofSQLServerDataSource
is created.
# Factory for creating data source objects
class DataSourceFactory:
def get_data_source(self, source_type):
if source_type == "adls":
return ADLSDataSource()
elif source_type == "sql_server":
return SQLServerDataSource()
else:
raise ValueError("Unknown data source type")
The key advantage of using the factory pattern here is it simplifies object creation. Instead of having object creation logic scattered throughout the code, we centralize it within the factory. This makes the code cleaner, easier to understand, and more maintainable. Additionally, it simplifies future updates. For example, if a new data source type needs to be added, it can be added to the factory with minimal impact on the rest of the code.
Using configuration files
I’ve also utilised configuration ("config") files to streamline the process further. Config files are like instruction manuals that tell the code what to do without hardcoding every detail into the program. Here’s what I did:
Reading the Config File: First, I read a config file named ‘source_config.json’. This file contains instructions on handling different data sources, specifying what type of data source to use and its corresponding read and save parameters. Here’s a snapshot of the ‘source_config.json’ structure:
{
"data_sources": [
{
"type": "adls",
"read_params": {
"path": "abfss://source@arvind20.dfs.core.windows.net/sample_data.parquet"
},
"save_params": {
"path": "abfss://medallion@arvind20.dfs.core.windows.net/Bronze/adls_bronze_data.parquet"
}
},
{
"type": "sql_server",
"read_params": {
"host": "your_sql_server_host",
"port": "1433",
"user": "your_username",
"password": "your_password",
"database": "your_database_name",
"dbtable": "your_schema.your_table_name"
"save_params": {
"path": "abfss://medallion@arvind20.dfs.core.windows.net/Bronze/sqlserver_bronze_data.parquet"
}
}
]
}
This JSON snippet defines two data sources: ADLS and SQL Server, each with specific parameters for reading and saving data. For the ADLS source, the read parameter contains the URL pointing to our sample file. For the SQL Server source, the read parameter includes the necessary credentials to connect to the database. The save parameters for both sources are the same, targeting the Bronze layer, where the data is stored in its raw, unprocessed form.
Let's read this file :
# Loading configuration from a file
with open('/Workspace/Users/arvind.yekkirala@globant.com/Config/source_config.json') as config_file:
config = json.load(config_file)
Using the Factory Pattern: Following the configuration, I will instantiate the DataSourceFactory
we talked about earlier. This factory will create the correct data source type based on the config file's instructions.
# instantiate the class
factory = DataSourceFactory()
Looping Through Configurations: The code below efficiently processes each data source listed in the JSON configuration by first determining its type — ADLS or SQL Server — and then dynamically obtaining the right data source object using the factory pattern. For each data source, it utilizes ‘read_params’ to fetch data (e.g., from a specified path for ADLS or using database credentials for SQL Server). It then applies ‘save_params’ to store the fetched data in the Bronze layer, where raw, unprocessed data is kept. This approach highlights the system’s adaptability and precision in handling diverse data sources.
for source_config in config['data_sources']:
source_type = source_config['type']
source = factory.get_data_source(source_type)
if 'read_params' in source_config:
df = source.read_data(**source_config['read_params'])
if 'save_params' in source_config:
# Save data to bronze layer
source.save_data(df, **source_config['save_params'])
Efficient Data Processing: Utility Classes, Static Methods, and Silver Layer Integration
In this section, we’ll explore using utility classes and static methods for data processing, focusing on building a DataValidator class for data integrity and efficient validation. I’ll show how these methods facilitate easy validation tasks and the role of decorators in enhancing functionality. This setup simplifies validations and supports efficient data merging into the Silver layer, illustrating a streamlined approach to maintaining a scalable data architecture.
The DataValidator Class
I created a DataValidator
class, which is a type of utility class. Utility classes are useful because they group a set of methods related to a specific functionality. They're like toolkits, providing methods that can be used across different parts of your program.
The methods in a utility class are usually static. Static methods are unique because they don’t need an instance of the class to be used. You can think of them as methods that belong to the class itself, not any particular object created from the class. This means you can call these methods directly using the class name, like DataValidator.check_unique(...)
, without needing to create an object of the DataValidator
class.
Let's have a close look at the key methods within the DataValidator
class and their functionalities:
check_unique
: This method verifies the uniqueness of values in specified columns within a DataFrame. It iterates through each column, checking for duplicate entries. If duplicates are found, it logs an error and raises aValueError
, specifying the affected column. This functionality is vital for maintaining the uniqueness of identifiers or key fields in the data.check_nulls
: Aimed at data quality assurance, this method identifies null or missing values in specified columns. Should null values be detected, it raises aValueError
to highlight the presence of incomplete data. This step is crucial for datasets where completeness is necessary for accurate analysis or further processing.validate_schema
: This method ensures that the schema of the DataFrame matches a predefined schema, raising aValueError
in case of any discrepancies. It's essential for verifying that the data structure conforms to expected standards, facilitating reliable data manipulation and analysis.parse_schema
: This method is responsible for converting a schema configuration, defined in a JSON-like structure, into a SparkStructType
. This schema is then used to validate or enforce the structure of a DataFrame, ensuring it aligns with expected formats. The method dynamically constructs a schema based on field names and types specified in the configuration, providing flexibility in defining data structures.
These methods are marked as @staticmethod
to allow direct access from the class, simplifying data validation tasks.
class DataValidator:
@staticmethod
@log_function_call
def check_unique(df, columns):
#df = df.withColumn("id", lit(1))
for col in columns:
duplicates = df.groupBy(col).agg(count(col).alias("count")).filter("count > 1")
if duplicates.count() > 0:
logger.error("Duplicate IDs found in column: {}".format(col))
raise ValueError(f"Duplicate values found in column: {col}")
logger.info("No duplicate IDs found in column: {}".format(col))
@staticmethod
def check_nulls(df, columns):
for col_name in columns:
null_count = df.filter(col(col_name).isNull()).count()
if null_count > 0:
raise ValueError(f"Null values found in column: {col_name}")
@staticmethod
def validate_schema(df,schema):
if not df.schema == schema:
raise ValueError("DataFrame schema does not match the expected schema.")
@staticmethod
def parse_schema(schema_config):
field_types = {
"string": StringType(),
"integer": IntegerType(),
"double": DoubleType(),
"date": DateType()
# Add other data types as necessary
}
fields = [StructField(field["name"], field_types[field["type"]]) for field in schema_config["fields"]]
return StructType(fields)
# ... other static validation methods ...
Decorators: Enhancing Functionality
If you observe the code snippet above, you’ll notice I’ve implemented a Python feature called decorators. Decorators are like special add-ons placed above a function. They enhance or modify the function’s behavior without altering its primary purpose.
Take the @log_function_call
decorator, for instance, used in my DataValidator
class methods. This decorator adds a logging functionality. It automatically logs when a function starts and ends and records any errors that occur during its execution.
Here’s how it works:
Initialization of Logger: Before defining the decorator, a logger instance is created using Python’s built-in logging
module. This logger is configured to capture debug-level messages and output them to a file named 'my_log_file.log', ensuring that all log messages generated by the decorator are stored for review.
logger = logging.getLogger('log4j')
logger.setLevel(logging.DEBUG)
logFileHandler = logging.FileHandler('/dbfs/temp/my_log_file.log' , mode = 'w')
logger.addHandler(logFileHandler)
Decorator Definition (log_function_call
): The decorator itself is defined as a function that takes another function (func
) as its argument. This structure allows @log_function_call
to be applied to any function or method, enhancing it with additional logging functionality.
def log_function_call(func):
def wrapper(*args, **kwargs):
logger.info(f"Entering function: {func.__name__}")
try:
result = func(*args, **kwargs)
logger.info(f"Exiting function: {func.__name__}")
return result
except Exception as e:
logger.error(f"Exception in {func.__name__}: {e}")
raise # Re-raise the exception to propagate it
return wrapper
Wrapper Function: Inside the decorator, a wrapper
function is defined. This function is what gets called in place of the original function. It performs the following actions:
- Logs an entry message stating which function is being entered.
- Executes the original function (
func
) with its arguments (*args
and**kwargs
), capturing the result. - Logs an exit message upon successful completion of the function.
- In case of an exception, logs an error message including the exception details, then re-raises the exception to ensure the decorator does not suppress it.
We can see that applying decorators significantly aids in monitoring and debugging the validation processes, especially during complex data validations.
Implementing the Data Validator Class
I used the DataValidator class for validating data in the Bronze layer and used a configured JSON file to drive this, as shown in the earlier examples.
The JSON snippet outlines the validation rules for data stored in the Bronze layer. It specifies:
- Data Source Information: Each entry under
bronze_layer_validations
contains details about a data source, including its name and the path to its stored data. - Schema Definition: The
schema
object describes the expected structure of the data, listing each field and its data type. This ensures the data adheres to a predefined format, crucial for downstream processing. - Validation Rules: Under
validations
, specific rules likecheck_unique
andcheck_duplicates
are defined for various fields. These rules dictate which validations theDataValidator
the class should perform, such as ensuring no duplicate values exist for specified columns
{
"bronze_layer_validations": [
{
"name": "ADLS Data",
"path": "abfss://medallion@arvind20.dfs.core.windows.net/Bronze/adls_bronze_data.parquet",
"schema": {
"fields": [
{
"name": "id",
"type": "integer"
},
{
"name": "sku",
"type": "string"
},
{
"name": "outlet_id",
"type": "integer"
},
{
"name": "quantity",
"type": "integer"
},
{
"name": "date",
"type": "date"
}
]
},
"validations": {
"check_unique": [
"id"
],
"check_duplicates": [
"sku",
"outlet_id"
]
}
}
]
}
The Python code snippet demonstrates how the configuration file drives the data validation process:
- Loading the Configuration: The JSON configuration file is initially loaded to dictate the validation logic.
- Reading Data: For each dataset defined, the code reads the corresponding data from the specified path using Spark’s
read.parquet
method. - Schema Construction and Validation: The
parse_schema
method constructs a schema from the configuration, which is then used to validate the data's structure withvalidate_schema
. - Applying Specific Validations: Based on the
validations
defined in the JSON, specific checks like uniqueness and null value presence are performed on the data.
#Validate Data
config = json.load(open('/Workspace/Users/arvind.yekkirala@globant.com/Config/config_val.json'))
for dataset in config['bronze_layer_validations']:
df = spark.read.parquet(dataset['path'],header = True)
# Construct schema from config
schema = DataValidator.parse_schema(dataset['schema'])
# Apply validations
for validation, params in dataset['validations'].items():
if validation == "check_unique":
DataValidator.check_unique(df, params)
elif validation == "check_nulls":
DataValidator.check_duplicates(df, params)
elif validation == "validate_schema":
DataValidator.validate_schema(df, schema)
Project-Specific Validation
Following that, we introduce a ProjectSpecificValidator
class specifically created for handling project-specific validation. This type of validation caters to rules that are unique to the project and based on specific requirements from stakeholders. These aren't generic validations but are instead tailored to meet the particular needs and conditions of this project.
For instance, in the class below, methods such as check_date_ranges
verifying date fields fall within stipulated ranges are implemented to ensure data adheres to business-specific criteria. Similarly, check_string_patterns
is designed to validate text fields against predefined patterns.
class ProjectSpecificValidator:
def validate(self, df, validation_rules):
# Loop through and apply each project-specific validation rule
for validation in validation_rules:
rule = validation.get("rule")
params = validation.get("params", {})
validation_method = getattr(self, rule, None)
if callable(validation_method):
validation_method(df, **params)
def check_value_ranges(self, df, **params):
# Project-specific value range validation
pass
def check_date_ranges(self, df, column, min_date, max_date):
invalid_dates = df.filter((df[column] < min_date) | (df[column] > max_date))
# If there are rows with dates outside the valid range, raise an exception
if invalid_dates.count() > 0:
raise ValueError(f"Found rows in column '{column}' with dates outside the range {min_date} to {max_date}")
def check_string_patterns(self, df, **params):
# Validate string patterns (e.g., regex validation for text fields)
pass
I used this ProjectSpecificValidator class for handling project-specific validations and used a config file to drive this, as shown in the earlier examples.
#Project Specific Validation
with open('/Workspace/Users/arvind.yekkirala@globant.com/Config/project_validation_config.json') as config_file:
config = json.load(config_file)
for dataset in config['bronze_layer_validations']:
df = spark.read.parquet(dataset['path'],header = True)
project_validator = ProjectSpecificValidator()
if 'project_specific_validations' in dataset:
project_validator.validate(df, dataset['project_specific_validations'])
Data Merging with the DataMerger Class
Next, I created a utility class called DataMerger
to handle the merging of data into the Silver and Gold layers of the Medallion architecture. This class is designed to be a utility class as it can be used for both layers.
The DataMerger
class includes the following methods:
- Public Method for Merging: The main method,
merge_into_delta
, is used to either merge data into an existing Delta table or create a new one if it doesn't exist. This method checks the existence of the table and then calls the appropriate private method. - Protected Methods for Specific Tasks: Inside the class, I’ve used protected methods like
_delta_table_exists
,_create_new_delta_table
, and_merge_delta_table
. These methods are marked as protected to encapsulate functionality, enhancing the security and maintainability of the code. This approach is to align with best practices in object-oriented programming.
class DataMerger:
@staticmethod
def merge_into_delta(target_table_path, dataframe, merge_key):
if DataMerger._delta_table_exists(target_table_path):
DataMerger._merge_delta_table(target_table_path, dataframe, merge_key)
else:
DataMerger._create_new_delta_table(target_table_path, dataframe)
@staticmethod
def _delta_table_exists(target_table_path):
# Check if the Delta table exists
spark = SparkSession.builder.getOrCreate()
try:
DeltaTable.forPath(spark, target_table_path)
return True
except Exception as e:
# Handle specific exceptions if necessary
return False
@staticmethod
def _create_new_delta_table(target_table_path, dataframe):
# Create a new Delta table
dataframe.write.format("delta").save(target_table_path)
@staticmethod
def _merge_delta_table(target_table_path, dataframe, merge_key):
# Merge the DataFrame into the existing Delta table
spark = SparkSession.builder.getOrCreate()
delta_table = DeltaTable.forPath(spark, target_table_path)
# Define merge logic
merge_condition = f"source.{merge_key} = target.{merge_key}"
update_mapping = {col: f"source.{col}" for col in dataframe.columns if col != merge_key}
# Perform merge
delta_table.alias("target") \
.merge(
dataframe.alias("source"),
merge_condition
) \
.whenMatchedUpdate(set=update_mapping) \
.whenNotMatchedInsertAll() \
.execute()
This code snippet loads the configuration for merging data with the silver layer and executes the merge if the target table path and merge key are specified in the config.
#Merge with silver layer
with open('/Workspace/Users/arvind.yekkirala@globant.com/Config/silver_config.json') as file:
config = json.load(file)
# Extract the data merger configuration
data_merger_config = config.get("data_merger_config", {})
target_table_path = data_merger_config.get("target_table_path")
merge_key = data_merger_config.get("merge_key")
if target_table_path and merge_key
DataMerger.merge_into_delta(target_table_path, df, merge_key)
else:
print("Target table path or merge key is missing in the configuration.")
{
"data_merger_config": {
"target_table_path": "abfss://medallion@arvind20.dfs.core.windows.net/Silver/adls_silver_data",
"merge_key": "id"
}
}
Building Data Features: Encapsulation and Gold Layer Integration
Continuing onward, I’ve set up a FeatureGenerator
class to create new features from existing data. This class takes in data and a configuration of which features to generate.
In this class I have used private methods (indicated by double underscores) , offering a higher level of encapsulation compared to protected methods (with a single underscore). They are not just a convention, but a strict part of the Python language that prevents external access.
The FeatureGenerator Class
In the FeatureGenerator
class methods like __add_month_feature
and __add_day_feature
append month and day columns, respectively, while generate_features
applies specified methods from the configuration using __get_feature_methods_from_config
.
class FeatureGenerator:
def __init__(self, data, feature_config):
self.data = data
self.feature_config = feature_config
def generate_features(self):
feature_methods = self.__get_feature_methods_from_config()
for method_name in feature_methods:
method = getattr(self, method_name, None)
if callable(method):
self.data = method(self.data)
def __get_feature_methods_from_config(self):
# Extract the list of feature methods from the configuration
return self.feature_config.get('features_to_generate', [])
def __add_month_feature(self, df):
# Logic to calculate the month from the date
df = df.withColumn("month", month("date"))
return df
def __add_day_feature(self, df):
# Logic to calculate the day from the date
df = df.withColumn("day", dayofmonth("date"))
return df
Implementing Feature Generation
To learn how to use the FeatureGenerator
, the following code demonstrates a possible implementation.
- Loading Configuration Data: The process begins by loading the feature generation configuration from a JSON file.
- Extracting Feature Configuration: Once the configuration is loaded, the specific section pertaining to feature generation is extracted. This subset contains all the necessary details, such as the path to the source data in the Silver layer, which are crucial for guiding the feature generation process.
- Reading Source Data: The source data is then read from the specified path in the Silver layer. This data, typically stored in a ‘delta’ format, represents the intermediate data that will be used for feature generation.
- Initializing the FeatureGenerator: With the source data and feature configuration at hand, the
FeatureGenerator
object is instantiated. This object is responsible for applying the defined transformations and generating the new features as specified in the configuration. - Executing Feature Generation: Finally, the
generate_features()
method of theFeatureGenerator
is called. This method carries out the actual process of transforming the source data according to the predefined rules and configurations, resulting in a dataset with new features ready for further analysis.
#Generate Features
with open('/Workspace/Users/arvind.yekkirala@globant.com/Config/feature_generation_config.json') as file:
config = json.load(file)
feature_config = config['feature_generation']
silver_data = spark.read.format('delta').load(feature_config['silver_data_path'])
feature_generator = FeatureGenerator(silver_data, feature_config)
feature_generator.generate_features()
Configuration for Feature Generation
The feature generation process, as observed above, is driven by a configuration JSON. This configuration specifies the source data’s location and the specific features to be generated by the FeatureGenerator
.
{
"feature_generation": {
"silver_data_path": "abfss://medallion@arvind20.dfs.core.windows.net/Silver/adls_silver_data",
"features_to_generate": ["_FeatureGenerator__add_month_feature", "_FeatureGenerator__add_day_feature"]
}
}
Merging Data with the Gold Layer
I used the existing DataMerger
class to merge the enhanced data with the Gold layer. Just like with the Silver layer, I read the configuration for the Gold layer from a file and then called DataMerger.merge_into_delta
. This method efficiently merged my data into the specified Gold layer location, using the merge key defined in the config.
The JSON snippet below defines the configuration for the data merge operation into the Gold layer:
target_table_path
: Specifies the location within the Gold layer where the data should be merged. This path points to our target Delta table in Azure Data Lake Storage (ADLS), ensuring that the enhanced data is stored in the correct place for further analysis and reporting.merge_key
: Identifies the key column used to merge the data. In this case,id
is used as the unique identifier to determine how data rows are combined or updated, maintaining data integrity throughout the merge process.
{
"data_merger_config": {
"target_table_path": "abfss://medallion@arvind20.dfs.core.windows.net/Gold/adls_gold_data",
"merge_key": "id"
}
}
Following the configuration, the Python code snippet executes the data merge:
- Loading Configuration: The process starts by loading the
gold_config.json
file, which contains the necessary parameters for the data merge operation, as defined above. - Executing the Merge: With the configuration loaded, the
DataMerger.merge_into_delta
method is called, passing the target table path, the DataFrame containing the enhanced data (feature_generator.data
), and the merge key. This method efficiently handles the complexity of merging data, ensuring that new records are inserted and existing records are updated based on the merge key.
#Merge with gold layer
with open('/Workspace/Users/arvind.yekkirala@globant.com/Config/gold_config.json') as file:
config = json.load(file)
gold_layer_config = config.get("data_merger_config", {})
DataMerger.merge_into_delta(gold_layer_config['target_table_path'], feature_generator.data, gold_layer_config['merge_key'])
Summary
To sum up, the combination of Object-Oriented Programming (OOP) and config-driven architecture was key in making my data pipeline project efficient and manageable. OOP provided a clear, organized structure, and the config-driven approach added essential flexibility. These elements together formed a solid foundation that greatly enhanced the pipeline’s functionality and adaptability.