PySpark & Data Quality

“No data is clean, but most is useful.”~ Dean Abbott

Sara M.
Towards Data Engineering
6 min readDec 1, 2023

--

Photo by Joshua Sortino on Unsplash

Data quality refers to the overall condition of data. It encompasses various aspects that determine the usefulness and accuracy of data for a particular purpose.

In a data-driven architecture, data quality is foundational to the success of the architecture. Poor data quality can lead to inefficiencies in business operations. Inaccurate or incomplete data may result in misguided strategies, inefficient processes, and wasted resources.

So inaccurate data needs to be identified, documented and fixed ASAP !

How to identify if we have a good or bad data ?

We will refer to what we call “Data Quality Dimensions”.

Data quality dimensions are measurement attributes of data, which you can individually identify, interpret, and improve. In the specific context of your organization, the aggregated scores of multiple dimensions represent data quality and indicate the level of quality of your data.

The data quality dimensions can vary depending on the source or framework used, but there are commonly six to ten dimensions that are frequently acknowledged.

  • Uniqueness: That ensure that each record or data entity is distinct and does not have duplicates within the dataset.
  • Accuracy: That ensure that data accurately represents the real-world entities or events it is intended to describe. In other words, accurate data is free from errors and faithfully reflects the true values or conditions it is supposed to represent.
  • Completeness: That ensure that all required data elements are present in a dataset, without any missing or omitted values.
  • Validity: That ensures that data adheres to predefined rules, standards, or constraints. Valid data meets the established criteria for correctness and conforms to the expected format, range, or structure.
  • Consistency: That ensure that data is uniform and coherent across various sources, databases, or time periods. It ensures that data is free from discrepancies, contradictions, or conflicts, and that it maintains a standardized format or structure.
  • Timeliness: That ensure that data is available within the expected or required time-frame. It emphasizes the importance of data being up-to-date and relevant for its intended use.

In a python environment, PySpark API is a a great tool to do a variety of data quality checks.

Setup PySpark

To setup pyspark, you can install Spark with docker-compose.

You can find more details in my previous article.

PySpark VS Uniqueness

With PySpark, you can check for duplicates values.

Suppose you have a csv file containing information about employees with duplicate values.

Name,Age,City,Occupation
John Doe,25,New York,Engineer
Jane Smith,30,Los Angeles,Teacher
Robert Johnson,28,Chicago,Doctor
Emily White,22,San Francisco,Software Developer
Michael Brown,35,Miami,Accountant
Amanda Davis,27,Dallas,Developer
Christopher Lee,32,Seattle,Graphic Designer
Sarah Taylor,29,Boston,Data Analyst
Daniel Miller,31,Atlanta,Lawyer
Olivia Clark,26,Austin,Architect
John Doe,25,New York,Engineer
Emily White,22,San Francisco,Software Developer
Amanda Davis,27,Dallas,Marketing Manager

Let’s load the file :

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,sum,current_date

spark = SparkSession.builder.appName("data-quality-checks").getOrCreate()

# Reduce logging
spark.sparkContext.setLogLevel("WARN")
# Define the path to the CSV file
file_path = "/data/employee.csv"

# Load the CSV file into a PySpark DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the original DataFrame
df.show()

You can display rows that have duplicate values across all columns:

#display rows with duplicate values across all columns
df.exceptAll(df.dropDuplicates()).show()

+-----------+---+-------------+------------------+
| Name|Age| City| Occupation|
+-----------+---+-------------+------------------+
| John Doe| 25| New York| Engineer|
|Emily White| 22|San Francisco|Software Developer|
+-----------+---+-------------+------------------+

Or display rows that have duplicate values across ‘name’ and ‘age’ columns:

#display rows with duplicate values across 'name' and 'age' columns
df.exceptAll(df.dropDuplicates(['name', 'age'])).show()

+------------+---+-------------+------------------+
| Name|Age| City| Occupation|
+------------+---+-------------+------------------+
| John Doe| 25| New York| Engineer|
| Emily White| 22|San Francisco|Software Developer|
|Amanda Davis| 27| Dallas| Marketing Manager|
+------------+---+-------------+------------------+

PySpark VS Completeness

With PySpark, you can handle missing or null values.

Suppose that your csv file contain information about employees with missing values.

Name,Age,City,Occupation
John Doe,25,New York,Engineer
Jane Smith,30,Los Angeles,Teacher
Robert Johnson,-5,Chicago,Doctor
Emily White,22,,Software Developer
Michael Brown,35,Miami,Accountant
Amanda Davis,,Dallas,Marketing Manager

The code below will allow you to count the number of missing or null values in each column.

# Check completeness: Count null values in each column
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

This will return:

+----+---+----+----------+
|Name|Age|City|Occupation|
+----+---+----+----------+
| 0| 1| 1| 0|
+----+---+----+----------+

PySpark VS Accuracy

With PySpark, you can validate and clean the data to eliminate errors or inconsistencies.

Suppose that your csv file contain information about employees and we want to ensure the accuracy of the age information by identifying and handling potential errors.

Name,Age,City,Occupation
John Doe,25,New York,Engineer
Jane Smith,30,Los Angeles,Teacher
Robert Johnson,-50,Chicago,Doctor
Emily White,22,New York,Software Developer
Michael Brown,35,Miami,Accountant
Amanda Davis,300,Dallas,Marketing Manager

The code below will identify invalid age value and replace it.

# Accuracy check: Identify and handle errors in the 'Age' column
df_cleaned = df.withColumn(
"Age",
when((col("Age").cast("int").isNull()) | (col("Age") <= 0), None).otherwise(col("Age"))
)
# Display the cleaned DataFrame
df_cleaned.show()

This will return:

+--------------+----+-----------+------------------+
| Name| Age| City| Occupation|
+--------------+----+-----------+------------------+
| John Doe| 25| New York| Engineer|
| Jane Smith| 30|Los Angeles| Teacher|
|Robert Johnson|Null| Chicago| Doctor |
| Emily White| 22| New York|Software Developer|
| Michael Brown| 35| Miami| Accountant|
| Amanda Davis|Null| Dallas| Marketing Manager|
+--------------+----+-----------+------------------+

PySpark VS Validity

With PySpark, you can enforce data validity through various checks and constraints.

Suppose that your csv file contain information about employees and we want to ensure the validity of the ‘Occupation’ column. We’ll set a rule that each employee’s occupation must be one of a predefined set of valid occupations.

Name,Age,City,Occupation
John Doe,25,New York,Engineer
Jane Smith,30,Los Angeles,Teacher
Robert Johnson,50,Chicago,Doctor
Emily White,22,San Francisco,Software Developer
Michael Brown,35,Miami,Accountant
Amanda Davis,60,Dallas,Marketing Manager

We add the rule and ensure that values of “Occupation” are part of a predefined set of valid occupations.

# Validity check: Ensure 'Occupation' is one of the valid occupations
valid_occupations = ["Engineer", "Teacher", "Software Developer", "Accountant", "Marketing Manager"]

df_valid = df.withColumn(
"Occupation",
when(col("Occupation").isin(valid_occupations), col("Occupation")).otherwise(None)
).filter(col("Occupation").isNotNull()).show()

This will return:

+--------------+---+-------------+------------------+
| Name|Age| City| Occupation|
+--------------+---+-------------+------------------+
| John Doe| 25| New York| Engineer|
| Jane Smith| 30| Los Angeles| Teacher|
| Emily White| 22|San Francisco|Software Developer|
| Michael Brown| 35| Miami| Accountant|
| Amanda Davis| 60| Dallas| Marketing Manager|
+--------------+---+-------------+------------------+

PySpark VS Consistency

With PySpark, you can ensure that your Spark dataframes or RDDs have a consistent structure and format, especially when you’re working with multiple data sources or transformations.

Let’s go through a simple example:

Suppose you have two csv files representing information about employees, but the column names are slightly different. The goal is to load these CSV files into PySpark dataframes and ensure consistency in column names before further processing.

We will load the 2 files:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("consistency").getOrCreate()

# Define paths
file_path1 = "/data/employee_data_1.csv"
file_path2 = "/data/employee_data_2.csv"

# Load the CSV files into PySpark DataFrames
df1 = spark.read.csv(file_path1, header=True)
df2 = spark.read.csv(file_path2, header=True)

To ensure consistency in column names, we use the “withColumnRenamed” method to standardize the column names in both DataFrames to ensure consistency.

# Ensure consistency in column names, rename columns
df1 = df1.withColumnRenamed("emp_id", "employee_id").withColumnRenamed("emp_name", "employee_name")
df2 = df2.withColumnRenamed("employeeID", "employee_id").withColumnRenamed("employeeName", "employee_name")

PySpark VS Timeliness

With PySpark, you can examine timestamps and ensure that the data is up-to-date based on a specified criterion.

Assuming you have a CSV file with the following content:

EventID,EventName,EventDate
1,Meeting,2023-10-15
2,Webinar,2023-12-18
3,Conference,2023-11-20
4,Training,2023-12-22
5,Workshop,2023-12-25

The code below will check whether the events occurred within the last 7 days to ensure timeliness.

# Timeliness check: Filter events that occurred within the last 7 days
days_threshold = 7
df_timely = df.filter((current_date() - col("EventDate")).cast("int") <= days_threshold)

# Display the DataFrame after handling timeliness issues
df_timely.show()

This will return:

+-------+---------+----------+
|EventID|EventName| EventDate|
+-------+---------+----------+
| 2| Webinar|2023-12-18|
| 4| Training|2023-12-22|
| 5| Workshop|2023-12-25|
+-------+---------+----------+

If you have any questions, feedback, or would like to share your experiences, please feel free to reach out in the comments section.

Clap my article 50 times 👏, that will really help me out and boost this article to others ✍🏻❤️. Follow me on Medium to get my latest article.

Thank you 🫶!

--

--

Sara M.
Towards Data Engineering

A Data Specialist 📊 Eager to learn ? then follow me !