Data Quality Framework in Databricks

Gunanithi CS
5 min readFeb 13, 2024

Introduction:

In today’s data-driven landscape, the reliability and accuracy of data are critical for informed decision-making and business success. Establishing a robust Data Quality framework is paramount to address the challenges of data inconsistency, incompleteness, and inaccuracies. This article dives into the intricacies of building such a framework within the Databricks environment.

We’ll start by reading the files from the Raw layer and transforming them. Once transformed, the files will be moved to the Processed layer. From there, they’ll be ingested into the presentation layer. Any error files encountered during this process will be moved to the Error layer for appropriate handling.

Step 1

All data quality (DQ) rules are stored in tables within a PostgreSQL database, we have 5 tables to mention the rules.

  1. The Screen table maps each file to its associated screens, defining the specific DQ rules applied to each file during processing.
  2. In the dq_rules table, row, and profile rules are loaded, with filtering based on the screen ID from the Screen table to ensure the application of appropriate rules based on the file name.
  3. The dq_scr_run_log table logs the status of each screen run, capturing start and end times for each execution.
  4. The dq_row_alert_log table records all error records generated by row rules, associating them with their respective rule IDs from the dq_rules table.
  5. The dq_profile_alert_log table records all error records generated by profile rules, associating them with their respective rule IDs from the dq_rules table.

To validate the rule setup:

select s.screen_id,filename,screen_ds,rule_id,rule_name,rule_ds,rule_type 
from screen s
join dq_rules dqr on s.screen_id=dqr.screen_id

To Check the error logs:

select distinct q.rule_id,q.rule_ds,error_record_count,rule_name,rule_query from dq_row_alert_log a
inner join dq_rules q on q.rule_id=a.rule_id

select distinct a.rule_id,a.rule_ds,error_record_count,rule_name,rule_query from dq_profile_alert_log a
join dq_rules q on q.rule_id=a.rule_id

I have attached the SQL file for the above model, Please download it from SQLFile

Step 2

Additionally, We have a pipeline dedicated to loading DQ rules into our database. To utilize this pipeline, please place the rules and screen CSV files in the designated rules folder. The pipeline will automatically detect and load these files into the appropriate tables. we have database connection details in a setup file folder and will read and process the load.

df = spark.read.option('header',True).option('inferschema','true').csv(f'{setup_folder_path}/user.csv')
username = df.select('username').take(1)[0]['username']
database = df.select('database').take(1)[0]['database']
password = df.select('pass').take(1)[0]['pass']
host = df.select('host').take(1)[0]['host']
port = df.select('port').take(1)[0]['port']

JDBC_Url = "jdbc:postgresql://{}/{}".format(host,database)

connProperties = {
'user':username,
'password':password
}

def select_table(query):
df = spark.read.jdbc(url=JDBC_Url ,table=query,properties=connProperties)
return df

def insert_by_df(table,df):
df.write.mode('append').jdbc(url=JDBC_Url,table=table,properties=connProperties)

def execute_query(query):
con.prepareCall(query).execute()

Step 3

We have four layers in our data processing pipeline: Raw, Error, Processed, and Presentation.

  • We will read all files from the Raw layer and process them accordingly. After reading each file, we’ll check the Screen table to see if the filename matches any screen. If a match is found, we’ll initiate the DQ check for that file.
def readAllFIles(path):
files = []
for i in dbutils.fs.ls(path):
files.append(i)
return files

## reading all the files from path
files = readAllFIles(raw_folder_path)

def validate_files():
for i in files:
for row in scr_df.rdd.collect():
## Checking DQ Screen for input file
if(re.search(row['filename'], i[1])):
scr_id_dq = scr_df.filter(col('filename')==row['filename']).rdd.collect()[0]
if(scr_id_dq is not None):
source_file_df = apply_dq_rules(scr_id_dq,i)
else:
print(f'No Rules for this file {i[1]}')
  • Once the DQ checks are initiated, the system will first process row rules, followed by profile rules. For row rules, the system will filter based on rule types specified in the dq_rules table, generate an error file with the same source file name in the error layer, and update the error logs in dq_row_alert_log the table
def apply_row_rules(source_file_df,row_rules_df,TEMPORARY_TARGET,screen_id):
try:
count = 0
## start time
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

## Create an empty RDD with empty schema
filter_df = spark.createDataFrame([],
schema = source_file_df.schema)

## Reading all error records
for row in row_rules_df.rdd.collect():
temp_df = spark.sql(row['rule_query'])

if(temp_df.count()):

## Updating the log table
error_log_query = f"""insert into dq_row_alert_log (rule_id,screen_id,rule_ds,error_record_count,rec_ins_ts)
VALUES ({row['rule_id']}, {row['screen_id']}, '{row['rule_ds']}',{temp_df.count()},current_timestamp);"""

count = count+temp_df.count()

create_log(error_log_query)

## Append error records
filter_df = filter_df.union(temp_df)

## Creating error file in temp path
filter_df.repartition(1).write.mode("append").option("header", "true").option("delimiter", ",").csv(TEMPORARY_TARGET)

## End time
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

## update log table with screen status
update_scr_log(screen_id,start_time,end_time,'SUCCESS','null',)

return count
except Exception as e:
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('Failed')
update_scr_log(screen_id,start_time,end_time,'Failed',str(e))
  • For profile rules, we’ll process the info rules specified in the dq_rules table. Upon generating error logs, they'll be stored in the dq_profile_alert_log table.
def apply_profile_rules(source_file_df,profile_rules_df,TEMPORARY_TARGET,screen_id):
try:
count = 0
## start time
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

## Reading all error records
for row in profile_rules_df.rdd.collect():
temp_df = spark.sql(row['rule_query'])

if(temp_df.count()):

## Updating the log table
error_log_query = f"""insert into dq_profile_alert_log (rule_id,screen_id,rule_ds,error_record_count,rec_ins_ts)
VALUES ({row['rule_id']}, {row['screen_id']}, '{row['rule_ds']}',{temp_df.count()},current_timestamp);"""

count = count+temp_df.count()

create_log(error_log_query)

## end time
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

## update log table with screen status
update_scr_log(screen_id,start_time,end_time,'SUCCESS','null',)

return count
except Exception as e:
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('Failed')
update_scr_log(screen_id,start_time,end_time,'Failed',str(e))

Step 4

After applying the DQ rules, the source files with valid records will be moved to the Processed layer.

def fiterErrorRecords(scrDF,file):

## Reading all error records from temp paht
error_file_df = spark.read.option('header','true') \
.option('inferschema',True).csv(f"{error_folder_path}/{file[1]}")

filtered_df = scrDF.subtract(error_file_df)
display(filtered_df)
## Temp Path to lo error records
TEMPORARY_TARGET = f"{temp_folder_path}/{file[1]}/filtered".replace('.','_')

## Creating error file in temp path
filtered_df.repartition(1).write.mode("append").option("header", "true").option("delimiter", ",").csv(TEMPORARY_TARGET)

convert_csv_from_part(TEMPORARY_TARGET,f"{processed_folder_path}/{file[1]}")

os.remove(f"/{raw_folder_path}/{file[1]}".replace(':',''))

Step 5

After the files are moved to the Processed layer, we’ll read and convert them into the Parquet format. Subsequently, the source files will be relocated to an archive path.

def csvToParquet():

for i in files:
## reading source file
src_df = spark.read \
.option('header',True) \
.option('inferschema','true') \
.csv(i[0])
## Converting Parquet
src_df.write.mode("overwrite").parquet(f"{presentation_folder_path}/{i[1]}")
display(src_df.count())
## Deleting source file
dbutils.fs.cp(i[0], f'{archive_folder_path}/{i[1]}')
os.remove('/'+i[0].replace(':',""))

Step 6

The project setup is done. now, we’ll build the Databricks workflow. Start with the setup for the Screen and dq_rules tables, followed by executing the apply-dq pipeline, and conclude with the CSV to Parquet conversion process.

Project-related files are uploaded to the GitHub repository named data_quality_framework, you can also download the DBC and source files from the repository.

--

--