4 Tips for Data Quality Validations with Pytest and PySpark

Testing transformed data to yield a high-quality and dependable result

Taylor Wagner
Slalom Build
11 min readJun 3, 2024

--

This article is written in collaboration with Likitha Lokesh.

Photo by Lukas Blazek on Unsplash

Background

I recently contributed to a data software project as a quality engineer, which required a lot of testing on transformed data. In this project, AWS Glue was used to transform data from one Amazon S3 bucket to another. The data was transformed using Python, specifically PySpark; thus, the test automation framework for testing these transformations leaned on the same tech stack, with the addition of Pytest, in an effort to bridge cohesiveness.

To learn more about getting started with Pytest, PySpark, and AWS, check out this awesome blog written by my colleague, Likitha Lokesh, here.

Introduction

For the project mentioned above, my team was transforming data to be ingested by a third-party software tool. To ensure successful importing of data, each target file had a list of requirements. Each target file must have met the requirements otherwise the software wouldn’t ingest the data, and then the data wouldn’t be accessible for analysis.

The outlined requirements helped our team determine what the scripts should look like to transform the data from source files to target files, but this didn’t guarantee that all of the data in each of the target files would meet every requirement.

Discrepancies in data can cause skewed results when analyzing or using the data for other purposes. In the case of this project, which used financial data, the level of confidence in the data was an absolute must.

The following question then arises:

How do we know the transformed data meets expectations?

Not all data software project solutions are one-size-fits-all; however, there are a few techniques that I acquired from working on this project that could lend themselves to efficient engineering and thought processes across a variety of data-specific projects.

I’ve created a list of the four key takeaways from this experience for quality data testing:

  1. Wrap assertions as conditionals for logging invalid data
  2. Determine common data tests and parametrize
  3. Leverage Pytest warnings and XFail for known data concerns
  4. Manage environment variables with -E Flag

I will go through each of the four takeaways in this article explaining what each one means in more detail and highlighting why each point made the list.

Wrap Assertions as Conditionals

When performing automated testing on a traditional software project, the visibility into a bug is slightly more clear than with debugging data. With a traditional software project or application, I can pull up the app to inspect it or review the API, but data can be incredibly exhaustive. The magnifying glass to gain visibility into issues with data in PySpark is DataFrames.

In my project, I was testing data in .csv files. Many of the tests had the same outline:

  1. Read the .csv file and create a DataFrame
  2. Use a DataFrame method to analyze the data (as applicable to the requirements)
  3. Wrap the assertion as a conditional

Let’s say that for one column in the file, the data is ZIP codes and the requirement is that each value should be a length of exactly 5. This is how that test could look:

import length
import logging

def test_zipcode_data_length(spark_source, csv_file_path: str):

## Read the .csv file and Create a DF
dataframe = spark_source.read.csv(csv_file_path)

## Use the filter method on DF to analyze column values
## (and create another DF)
invalid_rows = dataframe.filter(length(dataframe['Zipcode']) != 5)

## Wrap Assertion in a Conditional for Logging/Debugging
if invalid_rows.count() == 0:
logging.info("All of the values in the 'Zipcode' column are equal
to a length of 5 as expected!")
assert True
else:
logging.error("All of the values in the 'Zipcode' column should be
equal to a length of 5 and there are some values present that don't
meet those expectations!")
## Print the filtered DF with the rows that don't meet the requirements
invalid_rows.show(truncate=False)
assert False

Please note: There are many code snippets throughout the article. Keep in mind that it is best practice to use test helper functions to mediate code duplication among test methods, but it is out of scope for the purposes of this article.

As you can see in the example above, the assertion is not just a flat True/False assertion but rather placed within a conditional so that the appropriate logging can take place upon a failure to debug and locate the specific place where the data is not meeting expectations. The DataFrame with invalid rows could tell me specifically where to find the data that would need addressing for the software to ingest the file.

While I showed you an example of where a specific column would need an exact length of 5 for every value, the principle of the general outline/flow of the test remains the same. Without wrapping the assertion as a conditional, what would the other options be? Downloading transformed .csv files and manually reviewing/filtering to find mistake rows? That is not a very appealing option.

Regardless of what it is you’re testing, the need to quickly assess the root cause of the failure will always be a component of a quality testing strategy. Wrapping assertions as conditionals answers that need. The automated approach of wrapping the assertions in conditionals is efficient and provides a fail-fast approach to addressing data concerns quickly. This approach removes a lot of the guesswork and provides specifics needed to maintain and/or improve the quality of a system/pipeline.

Parametrize Common Data Tests

While data can be intimidating because it’s so vast — the upside is that it’s typically less ambiguous when it comes to testing. The requirements are pretty straightforward, and from my experience, easier to gather than more traditional software projects. Often common data requirements can overlap between different sets of data.

In the case of my project, many of the target files that were to be created had similar requirements for different files, and even different columns within the same file carried the same requirements. To keep it simple, one requirement was that each file contained data — and while that may seem like an obvious requirement, it’s a pretty easy and fast automated test that can be run on every file, which can come in handy for unexpected, edge-case data transformation scenarios.

But what is the best approach here so that the same simple test isn’t written for every single target file? How can code reusability best be leveraged? The answer was to parametrize the test using Pytest’s Parametrize marking.

import pytest

@pytest.mark.parametrize(VALUES HERE)
def test_data_present(spark_source, csv_file_path: str):

## Read the .csv file and Create a DF
dataframe = spark_source.read.csv(csv_file_path)

## Assert that the DF is not empty
assert dataframe.first() is not None

Through the parametrize marking in Pytest, all target files can be run through this same test to ensure that each file being created contains some kind of data at a minimum. There were several instances where this became important in my previous project, especially when it came to checking for additional comma delimiters within the data or columns requiring unique data.

The caveat here is that there may be a little more effort on the front end of defining the test scenarios — to look for those common requirements and strategize how to best reuse code — but test development will be sped up exponentially in the long term. What I learned from this experience was to better understand all of the requirements and map out the commonalities in those requirements first — before developing tests.

Create a plan of the different types of tests that will be created based on the requirements such as data types, uniqueness, formatting, validating no additional delimiters/proper column separation in the data, etc. Analyze the plan for patterns and try to consolidate tests as much as possible. Establishing this understanding of patterns in the requirements and creating a stronger plan before developing tests helps speed up development as well as test execution.

Leverage Warnings and XFail

When it comes to interacting with data, and more specifically developing new software that uses sensitive data (such as financial data), many times development is scaffolded among lower environments first before interacting with real data. This process helps to protect real (production/prod) data while all the kinks are worked out in development; however, a problem that often occurs with non-production environment data, such as QC/QE/DEV data, is the data can often get become unmanaged or mismanaged.

Mismanagement of non-production environment data can be tricky to work around and cause skewed results. Luckily, there are two capabilities embedded into Pytest that I would recommend leveraging when testing known data issues: Pytest Warnings and Pytest XFail. I don’t necessarily have a preference or recommendation between these two options. Choose the tool that you feel works best for your situation.

If you are already implementing, or plan to implement, my first suggestion in this article for wrapping your assertions as conditionals — you can easily start embedding Pytest Warnings into that same concept. The difference here is that instead of a false assertion, there will be a warning. It’ll look something like this:

import warnings
import logging

def test_email_data_unique(spark_source, csv_file_path: str):

## Read the .csv file and Create a DF
dataframe = spark_source.read.csv(csv_file_path)

## Use the count method on DF to capture the number of total rows
num_rows = dataframe.count()

## Use the select method - paired with the distinct and count methods
## on DF to analyze column values for uniqueness
num_unique_rows = dataframe.select(dataframe['Email'].distinct().count())

## Wrap Assertion in a Conditional and Leverage WARNINGS
if num_rows == num_unique_rows:
logging.info("All of the values in the 'Email' column are unique
as expected!")
assert True
else:
## Print the rows that don't meet the requirements
dataframe.groupBy(dataframe['Email']).count().where("count > 1").drop(
"count").show(truncate=False)
## Warn instead of fail
warnings.warn(UserWarning("Some of the data in the 'Email' column is
not meeting the uniqueness requirement!")

As a quality engineer, the color red is a sign that something needs attention. The best thing about using warnings for known issues is that it’s going to print yellow, which helps convey a message of “this is something known and doesn’t require immediate attention or concern.” This message is also conveyed to the other contributors when they go to execute the test suite, which provides the test team a sense of ease because efforts can be put forth toward the team’s velocity instead of acting as a messenger.

Another solid option for handling known data issues is the Pytest XFail marking. Similar to the Pytest Warnings capability, if the test fails, the result will come out yellow instead of red. I can’t double down on how helpful it is to be greeted by a color other than red, when applicable.

Terminal Screenshot of a Pytest Test Execution with Xfail
Image provided by the author

As you can see in the screenshot above, it’ll indicate the test including the XFail with a yellow ‘x’ in the test execution. However, the XFail indication is not marked in the assertion section of the test. XFail is marked at the top of the function similar to how we marked a test in the previous section for parametrization. Check out the XFail implementation below:

import pytest

@pytest.mark.xfail(reason="Known data issue, expected to fail temporarily")
def test_date_format():

## Rest of the test here

Again, like the warning option explored above, this helpful XFail marking allows the quality engineer to make the appropriate documentation as well as leave some contextual information for other teammates. A noteworthy additional benefit of XFail is that when a test that was expected to fail passes (i.e., the bug fix is addressed); then, the test fails which lets the quality engineer know it is now time to fix/alter the test.

Use with caution: Please consider when/how/why these options would best be used for specific project needs. It’s important to look into any discrepancies in the data that are not expected, and then only after

  1. determining the root cause of the failure
  2. assessing the priority/severity of the failure with your team

should you explore employing these capabilities of Pytest.

On the other side of the coin, having the option to indicate certain tests with YELLOW flags makes it so that the necessary automated tests are documented, present in the test suite, and accessible as the project progresses towards production, where hopefully the data issues are no longer present.

Dynamically Manage Environment Variables

To wrap up the four key takeaways, I want to continue to address the nuances of testing data between different environments. At the beginning of the article, I mentioned how the data that I was testing was hosted in Amazon S3 buckets. The bucket names and paths were listed in an INI configuration file in the repository and fed to the various tests; however, the bucket names were altered slightly based on the specified environment.

[BUCKET]
S3 = my-dev-environment-bucket

[PATH]
FILE-PATH = pathway/to/dev/environment

The variations of the bucket names and associated paths were things that I was looking to manage dynamically from the terminal upon each test session, and that’s where the Pytest -E Flag came in handy. By using fixtures and the pytest_addoption function, I was able to indicate the desired environment with the -E flag along with the standard Pytest command to flip-flop between environments with each test execution.

The INI file was transformed slightly from its original state with the hard-coded variables, but it didn’t impact variable names being fed into the tests, making this adjustment very low impact to the suite. Then, the INI file itself became a temporary template during each test run and was restored upon completion of test runs. The variation looked like this:

[BUCKET]
S3 = my-{env}-environment-bucket

[PATH]
FILE-PATH = pathway/to/{env}/environment

With this adjustment to the INI file, along with some methods to manage the file during test sessions, the command became pytest -E=dev or pytest -E=qc or pytest -E=prod based on the desired environment. This alteration made the complication of switching between environments due to varying bucket names extremely simple. There was no longer the dependency of a human having to remember to change the variable names in the INI file each time there was a desire to perform test execution in a different environment. Any team member with access could now easily switch between environments from the command line.

To learn more about how to implement this flexibility in INI files with Pytest, you can check out my how-to article here.

Parting Thoughts

As a quick refresher, the four key takeaways for data quality testing that I highlighted in this article were:

  1. Wrap Assertions as Conditionals for Logging Invalid Data
  2. Determine Common Data Tests and Parametrize
  3. Leverage Pytest Warnings and XFail for Known Data Concerns
  4. Manage Environment Variables with -E Flag

The overall goal is to increase the level of confidence in the quality of the transformed data as a result of these strategies. I hope that these takeaways were enlightening and you were able to pick up some new tips and strategies that can come in handy for your next data project.

I would love to know how these ideas were received, so feel free to drop some feedback in the comments!

#QE4DE

Resources

--

--