Data Validation using DataComPy with PySpark: Data Comparison Series — Part 3

Ahmed Uz Zaman
4 min readJan 13, 2023

--

In my previous article, we talked about data comparison between two CSV files using various different PySpark in-built functions. In this article, we are going to use an open-source python library called DataComPy. It is downloaded through pip and detailed documentation is listed here.

Datacompy is a Python library that allows you to compare two spark/pandas DataFrames to identify the differences between them. It can be used to compare two versions of the same DataFrame, or to compare two different DataFrames. It provides detailed information about what has changed, including which rows have been added, modified, or deleted, as well as the specific values that have changed. It also allows you to customize how the comparison is performed and how the results are presented.

We are going to go over PySpark in this article. For Pandas article go here.

Photo by Vanesa Giaconi on Unsplash

Once downloaded you need to import the library into your python script. We will use the same dataset as used in my previous article. But this time we are going to pass everything into the datacompy library as input.

Defining Dataframes and Variables

You need to define which one is your base (or) source of truth dataframe and which one is the compare (or) needed to be compared. In our example, df1 = base_df and df2 = compare_df. Let’s look at the syntax code below:

comparison = datacompy.SparkCompare(spark, base_df, compare_df
, join_columns = [('col1','COL1')]
, column_mapping = [('col1','col0'), ('col2','col1')])

datacompy.SparkCompare” — this tells the datacompy library that the dataframes are spark df and they need to be compared based on spark methodology.

spark” — default if using spark dataframes

“base_df “— as talked above, this would be the main / source of the truth dataframe

compare_df “— this would be the dataframe that needs to be compared.

join_columns” — this would be the primary key or composite primary key used for join purposes.

column_mapping” — this would be used if we have different column names in both dataframes that are needed to be mapped for comparison.

Now, let's look at our sample data and use datacompy for its comparison.

import datacompy

base_df = df1
compare_df = df2

comparison = datacompy.SparkCompare(spark, base_df, compare_df, join_columns = [('Store_ID', 'Store_ID')])

Once the above cell runs successfully you need to run the report() function to initiate comparison.

comparison.report()
****** Column Summary ******
Number of columns in common with matching schemas: 5
Number of columns in common with schema differences: 0
Number of columns in base but not compare: 0
Number of columns in compare but not base: 0

****** Row Summary ******

Number of rows in common: 896
Number of rows in base but not compare: 0
Number of rows in compare but not base: 3
Number of duplicate rows found in base: 0
Number of duplicate rows found in compare: 1

****** Row Comparison ******
Number of rows with some columns unequal: 5
Number of rows with all columns equal: 891

****** Column Comparison ******
Number of columns compared with some values unequal: 1
Number of columns compared with all values equal: 3

****** Columns with Unequal Values ******
Base Column Name Compare Column Name Base Dtype Compare Dtype # Matches # Mismatches
---------------- ------------------- ------------- ------------- --------- ------------
Store_Area Store_Area int int 891 5

As you can see in the output above, we compare df1 and df2 (base_df, compare_df) and join them on “Store_ID”. It gives us a detailed summary of what’s present in the dataframes. And what is wrong with it as well !

Column Summary: It has details on how many columns are present, whether they are matching or not etc.

Row Summary: It gives a record count summary and as you can see in its third line “Number of rows in compare but not in base” is 3. Which is exactly how many records are additional in df2. Also records the duplicates “Number of duplicate rows found in compare” is 1.

Row Comparison: It gives us a summary of how many records are exactly matching df1 and df2 when joined on “Store_ID”. As you can see 5 records have some mismatches and the rest 891 records are exactly matching.

Column Comparison: It gives a column-level summary of which columns are matching vs not matching. As you can see it says 1 column is having some values unequal, which means the issue is only in one column of data. Which column? we would know that in the next step.

Columns with Unequal Values: It gives you the name of the column that has mismatches and also the number of mismatches (number of rows the data is incorrect for that column). In our case, it was the “Store_Area” column that was mismatched in 5 records (rows).

Bonus: This comparison also gives you any metadata like datatype or columns missing data if there is any present. Since our data does not have any there is no output of that. Will try to capture this in a future article and link it here.

Conclusion

This is a sleek and easy way to compare your spark dataframe. But one thing to note is if the dataset is huge then it might run into memory errors. So make a note of that. It has worked for me on a 16GB RAM laptop for 4 million rows in each dataframe smoothly. If it’s more than that then it might start having some issues.

Here is the GitHub link to the jupyter notebook (DCS_Part3) and data as well. Follow for more QA and data quality-related articles. Happy reading.

--

--

Ahmed Uz Zaman

Lead QA Engineer | ETL Test Engineer | PySpark | SQL | AWS | Azure | Improvising Data Quality through innovative technologies | linkedin.com/in/ahmed-uz-zaman/