Data Reconciliation in Spark

Tharun Kumar Sekar
Analytics Vidhya
Published in
3 min readSep 20, 2020

Data Reconciliation is defined as the process of verification of data during data migration. In this process target data is compared against source data to ensure that the migration happens as expected.

Need for Data Reconciliation

  • You cannot trust your data without data verification.
  • Comparing record counts and fill rates does not always work.
  • Untrustworthy data leads to flawed insights.

Data Reconciler is a data reconciliation tool that checks for the accuracy of your data. Before taking you through the technical implementation, I would like to show you the output of the Reconciliation tool. You can run this code by yourself by following the instructions in next section.

The input dataset has 4 fields with a record count of 50 million records sizing about 1 GB in parquet format. After performing reconciliation on this dataset, we get the following output.

The above output provides the following information.

  • Matching Record Count (Record 6) — Number of records with matching primary keys in both datasets. In Other words, the record count after performing an inner join between the datasets. This value will be used as the denominator to calculate the percentage of matching records for each column.
  • Dropped Records (Record 7) — Number of records that exist in the old table but not in the new one. In other words, the output of a left anti join.
  • New Records (Record 8) — Number of records that exist in the new table but not in the old one.
  • Old File Path (Record 9) — Actual Number of records in the old table.
  • New File Path (Record 10) — Actual Number of records in the new table.
  • Field Name (Column 1) — Contains each column available in the old table.
  • Matching Record Count (Column 2) — Number of records with same values in both old and new column.
  • Mismatch Record Count (Column 3) — Number of records with different values in both old and new column.
  • Matching Record Percentage (Column 4) — Matching Record Count of individual column / Matching Record Count between Datasets (Record 6)

How to run the Data Reconciler?

The source code of the Data Reconciler is available in github. For now, the tool only support data in Parquet format and the data should have a primary key or a combination of primary keys. You can download the code and add in customization if you need and then build it. Once you have the jar, you can load it into your big data environment and trigger the job using the command

/usr/lib/spark/bin/spark-submit --deploy-mode cluster --executor-cores 5 --name Data_Reconciliation --class com.github.tharun.datareconciler.Pipeline {JAR_PATH} --qualityCheckType reconciler --oldTable {PATH_OF_OLD_DATA} --newTable {PATH_OF_NEW_DATA} --outputPath {PATH_OF_OUTPUT_DATA} --primaryKey {COMMA_SEPARATED_PRIMARY_KEYS}

Parameters:

  • Executor Cores — 5. This is for achieving a balance in parallelism and equal load.
  • Name — Data_Reconciliation. A name for your spark job.
  • Class — com.github.tharun.datareconciler.Pipeline. Main class or entry point for the spark job.
  • JAR_PATH — path where you have placed the JAR.
  • Quality Check Type — reconciler. For triggering the reconciliation part of the code.
  • Old Table — Path of Old Data. Path where you have stored the old dataset.
  • New Table — Path of New Data. Path where you have stored the new dataset.
  • Primary Key — Comma Separated Primary Key Column Names.

Technical Implementation

Once you have fed in both the datasets, they are joined based on the primary keys. Now, for each record if the value in Column “A” matches with the value in Column “B”, a new column with value as “1” is created and if the values doesn’t match, the new column is filled with value “0”. A sum of this new column, gives us the total matching records for each column. Once we have the matching record count, other values like mismatch record count, matching percentage are calculated.

Runtime Stats

Dataset 1

  • 50 Million Records
  • 6 GB in parquet
  • 170 columns
  • AWS r5.12x large — 5 nodes
  • 3 minutes runtime

Dataset 2

  • 350 Million Records
  • 30 GB in parquet
  • 170 columns
  • AWS r5.12x large — 10 nodes
  • 6 minutes runtime

Github URL — https://github.com/tharun026/SparkDataReconciler

Got questions? Feel free to comment here.

If you liked this article, click the 👏 so other people will see it here on Medium.

--

--