BI3 Technologies

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has…

Comparing data between two databases

--

(GIF) Demonstrating the general flow of the code

Introduction

Migrating data between different databases is often a complex process, especially when dealing with large datasets and sensitive information. Ensuring the accuracy and integrity of migrated data is critical, requiring efficient methods to handle massive amounts of information. In this blog, we highlight the challenges we faced during a recent data migration project and the solutions we developed to overcome them.

In our last project, we were tasked with migrating large datasets from the legacy Oracle DB to the cloud-based Azure SQL Server.

The challenge

To validate the migration (for sensitive data), comparing large datasets ranging from 2 to 30 gigabytes, with historical data spanning around 20 years. Traditional methods, such as using Pandas for comparison, proved too slow for this purpose. Additionally, specific tools like Toad for Oracle and SQL Compare were database-specific and incompatible with the databases we were working with. To address this, we devised a custom method involving the process of chunking and hashing the data.

Our Approach

  • Chunking: We divided the tables into logical sections called ‘chunks’, each containing a set number of records (10,000 in our case).
  • Fetching and Processing: We fetched records from the database, chunk by chunk. Each chunk of records was converted into one large string.
  • Hashing: We then hashed this large string. Hashing converts any string into a shorter string. Hashing algorithms are deterministic, so the same input always produces the same output. By hashing a chunk of rows (10,000 rows) at once, we eliminated the need to do 10,000 row-wise comparisons between tables across databases. This exponentially sped up the entire process.
  • Comparison: By comparing these hash strings, we could efficiently check for differences between chunks. If the hashes matched, the data was identical. If the hashes differed, we knew there was a discrepancy within that chunk. Finally, when the process is complete, a report is created for the result of the comparison for all the table.

What We Built

We built a few scripts that helped us with validating the data between the two databases by implementing the aforementioned ideas.

The below code can be treated as a template. We have expanded this template and used it to successfully validate several systems.

import pandas as pd
import pyodbc
import time
import hashlib
from itertools import zip_longest
from tqdm import tqdm
import warnings
PLACEHOLDER_NAN = 'NAN'chunksize = 1000# change the start_date and end_date as needed
start_date = (pd.Timestamp.now() - pd.DateOffset(days=14)).strftime('%Y-%m-%d %H:%M:%S')
end_date = '2024-09-24 15:30:00'
sql_server_db = '<YOUR_SQL_SERVER_DB>'
oracle_schema = '<YOUR_ORACLE_SCHEMA>'
sql_server_schema = f'SRC_IMG_{oracle_schema}'sql_server_query_suffix = f"WHERE INSERT_DATE BETWEEN '{start_date}' AND '{end_date}'"
oracle_query_suffix = f"WHERE INSERT_DATE BETWEEN TO_DATE('{start_date}', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('{end_date}', 'YYYY-MM-DD HH24:MI:SS')"
table_list = [
'TABLE1',
'TABLE2',
'...'
]
# Suppress specific UserWarning from pandas about database connections
warnings.filterwarnings("ignore", message="pandas only supports SQLAlchemy connectable")
def get_sql_server_connection(environment='PROD'):
if environment == 'TEST':
return pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=<>;DATABASE=<>;UID=<>;PWD=<>')
elif environment == 'DEV':
return pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=<>;DATABASE=<>;UID=<>;PWD=<>')
elif environment == 'PROD':
return pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=<>;DATABASE=<>;UID=<>;PWD=<>')
def get_oracle_connection():
return pyodbc.connect('DRIVER={Oracle in OraClient12Home1};DBQ=<>;UID=<>;PWD=<>;DATABASE=<>')
def hash_chunk(chunk):
# Convert the entire chunk to a string and then hash it
chunk_str = chunk.to_string(index=False)
return hashlib.sha256(chunk_str.encode()).hexdigest()
def calculate_max_chunk_count(table_name, chunksize):
conn_sql_server = get_sql_server_connection()
conn_oracle = get_oracle_connection()
sql_server_query = f"SELECT COUNT(*) AS COUNT FROM {sql_server_db}.{sql_server_schema}.{table_name} {sql_server_query_suffix};"
oracle_query = f"SELECT COUNT(*) AS COUNT FROM {oracle_schema}.{table_name} {oracle_query_suffix};"
sql_server_count = pd.read_sql(sql_server_query, conn_sql_server)
oracle_count = pd.read_sql(oracle_query, conn_oracle)
conn_sql_server.close()
conn_oracle.close()
return int(sql_server_count['COUNT'].iloc[0]/chunksize)+1, int(oracle_count['COUNT'].iloc[0]/chunksize)+1, sql_server_count['COUNT'].iloc[0], oracle_count['COUNT'].iloc[0]def compare_table_data(table_name):
start_time = time.time()
try:
max_chunk_count_sql_server, max_chunk_count_oracle, rows_sql_server, rows_oracle = calculate_max_chunk_count(table_name, chunksize)
chunk_index = 0
matched_chunks = 0
if max_chunk_count_sql_server != max_chunk_count_oracle:
print(f"Number of chunks do not match for table {table_name}. SQL Server: {max_chunk_count_sql_server}, Oracle: {max_chunk_count_oracle}")
comparison_results.append({
'Table Name': table_name,
'Chunksize': chunksize,
'Chunk Count': 'Mismatch in chunks.',
'Matched Chunks': matched_chunks,
'Time Taken': 0,
'Result': 'SKIPPED'
})
return
if chunk_index >= (max_chunk_count_sql_server or max_chunk_count_oracle):
print("Chunk limit exceeded, exiting comparison.")
comparison_results.append({
'Table Name': table_name,
'Chunksize': chunksize,
'Chunk Count': max_chunk_count_sql_server,
'Matched Chunks': matched_chunks,
'Time Taken': 0,
'Result': 'ERROR'
})
return
conn_sql_server = get_sql_server_connection()
conn_oracle = get_oracle_connection()
print('Comparing table with chunk-wise hashing: ', table_name)

# query to get all column names for the table
fetch_primary_key = f"""
SELECT KU.COLUMN_NAME
FROM {sql_server_db}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
INNER JOIN {sql_server_db}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
ON TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
AND KU.TABLE_NAME = TC.TABLE_NAME
WHERE KU.TABLE_NAME = '{table_name}'
AND KU.TABLE_SCHEMA = '{sql_server_schema}'
AND TC.CONSTRAINT_SCHEMA = '{sql_server_schema}'
AND KU.COLUMN_NAME NOT IN ('OPERATIONAL_DATE')
ORDER BY KU.ORDINAL_POSITION;
"""

# fetch_primary_key = f"SELECT COLUMN_NAME FROM {sql_server_db}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' AND TABLE_SCHEMA = '{sql_server_schema}';"
# certain columns are not needed for comparison as they might be constants/metadata columns or simply not present in one of the databases
filter_column_list = ['COL1','COL2']
primary_key_columns = pd.read_sql(fetch_primary_key, conn_sql_server)
# convert to a string with , separator
primary_key_columns = ','.join(primary_key_columns['COLUMN_NAME'].to_list())
all_chunks_match = True
# fetch all column names for the table
fetch_column_names = f"SELECT COLUMN_NAME FROM {sql_server_db}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' AND TABLE_SCHEMA = '{sql_server_schema}';"
column_names = pd.read_sql(fetch_column_names, conn_sql_server)
column_names = column_names[~column_names['COLUMN_NAME'].isin(filter_column_list)]
# this logic seems misplaced but it is here so no additional bullshit needs to be done to the column_names variable
columns_to_cast_int = ['TABLE_ID'] # Add more columns as needed
# Modify oracle_query to cast the specified columns to INT
oracle_columns = []
for col in column_names['COLUMN_NAME']:
if col in columns_to_cast_int:
oracle_columns.append(f"CAST({col} AS INT) AS {col}")
else:
oracle_columns.append(col)
# Join the column names with typecasting
oracle_columns = ','.join(oracle_columns)
# convert to a string with , separator
column_names = ','.join(column_names['COLUMN_NAME'].to_list())
# Fetch and compare data in chunks
sql_server_query = f"SELECT {column_names} FROM {sql_server_db}.{sql_server_schema}.{table_name} {sql_server_query_suffix} ORDER BY {column_names};"
oracle_query = f"SELECT {oracle_columns} FROM {oracle_schema}.{table_name} {oracle_query_suffix} ORDER BY {column_names};"
# Write the queries to a file, for debugging purposes. You can remove this if you want.
with open('queries.sql', 'a') as file:
file.write(f"-- SQL Server Query -- CREATED AT: {pd.Timestamp.now()}\n")
file.write(sql_server_query + "\n\n")
file.write(f"-- Oracle Query -- CREATED AT: {pd.Timestamp.now()}\n")
file.write(oracle_query + "\n")
sql_server_chunks = pd.read_sql(sql_server_query, conn_sql_server, chunksize=chunksize)
oracle_chunks = pd.read_sql(oracle_query, conn_oracle, chunksize=chunksize)
sql_server_hash_list = []
oracle_hash_list = []
for sql_server_chunk, oracle_chunk in tqdm(zip_longest(sql_server_chunks, oracle_chunks, fillvalue=None), total=max_chunk_count_sql_server):
# Break if both chunks are None, indicating the end of both tables
if sql_server_chunk is None and oracle_chunk is None:
break
# Compare chunks if both are present
if sql_server_chunk is not None and oracle_chunk is not None:
# replcae NaN with a placeholder
sql_server_chunk = sql_server_chunk.fillna(PLACEHOLDER_NAN)
oracle_chunk = oracle_chunk.fillna(PLACEHOLDER_NAN)
# replace blank strings with a placeholder
sql_server_chunk = sql_server_chunk.replace(r'^\s*$', PLACEHOLDER_NAN, regex=True)
oracle_chunk = oracle_chunk.replace(r'^\s*$', PLACEHOLDER_NAN, regex=True)
# Remove .0 from integer columns by casting to int where applicable
for col in columns_to_cast_int:
if col in sql_server_chunk.columns:
sql_server_chunk[col] = sql_server_chunk[col].apply(lambda x: int(x) if pd.notna(x) else x)
if col in oracle_chunk.columns:
oracle_chunk[col] = oracle_chunk[col].apply(lambda x: int(x) if pd.notna(x) else x)
sql_server_hash = hash_chunk(sql_server_chunk)
oracle_hash = hash_chunk(oracle_chunk)
# append the hash to a list
sql_server_hash_list.append(sql_server_hash)
oracle_hash_list.append(oracle_hash)
if sql_server_hash != oracle_hash:
all_chunks_match = False
else:
matched_chunks += 1
else:
# Handle case where one table has more chunks than the other
all_chunks_match = False
chunk_index += 1 if all_chunks_match:
print(f"All chunks match for table {table_name}.")
comparison_results.append({
'Table Name': table_name,
'Chunksize': chunksize,
'Chunk Count': max_chunk_count_sql_server,
'Matched Chunks': matched_chunks,
'Time Taken': str(time.time() - start_time) + ' seconds',
'Result': 'PASS'
})
else:
print(f"There are differences in table {table_name}.")
comparison_results.append({
'Table Name': table_name,
'Chunksize': chunksize,
'Chunk Count': max_chunk_count_sql_server,
'Matched Chunks': matched_chunks,
'Time Taken': str(time.time() - start_time) + ' seconds',
'Result': 'FAIL'
})
# conn_oracle.close()
print(f"SQL Server chunk count: {max_chunk_count_sql_server}, Oracle chunk count: {max_chunk_count_oracle}, Chunks compared: {chunk_index}, Matched chunks: {matched_chunks}")
df_hash_list = pd.DataFrame({'SQL Server Hash': sql_server_hash_list, 'Oracle Hash': oracle_hash_list}) df_hash_list['Start Row'] = df_hash_list.index * chunksize
df_hash_list['End Row'] = (df_hash_list.index + 1) * chunksize
# fix the last row
df_hash_list.loc[df_hash_list.index[-1], 'End Row'] = rows_sql_server
# df_hash_list should also contain a column for starting and ending row number of the chunk
df_hash_list.to_excel(f'chunks/{table_name}__TABLE_HASHES.xlsx', index=True, index_label='Chunk Index')
print(f"Time taken to compare table {table_name} chunk-wise: {time.time() - start_time} seconds")
except Exception as e:
print(f"Error comparing table {table_name} chunk-wise: {e}")
comparison_results = []for table_name in table_list:
compare_table_data(table_name)
df_comparison_results = pd.DataFrame(comparison_results)
df_comparison_results.to_excel(f'comparison_report_{end_date}.xlsx', index=False)

Explanation

For instance, consider the dummy data in the screenshot.

Image Depicting Dataset Differences

Each row represents a data record, and mismatched rows are highlighted. In this case, differences in values, timestamps, or flags across the datasets are evident. Such discrepancies could arise during data migration or synchronization between systems.

The highlighted differences show how a systematic validation approach can efficiently pinpoint inconsistencies, ensuring data integrity. This process is essential when working with large datasets, as it helps isolate errors without requiring exhaustive, manual reviews. While the script in this blog does not pinpoint errors, it merely detects if there’s a mismatch in the data, which is a good starting point.

More specifically, the script compares data between SQL Server and Oracle databases using a chunk-based approach to handle large datasets. It dynamically retrieves table schemas, hashes data chunks with SHA-256 for integrity checks, and identifies discrepancies. Reports detail matched/unmatched chunks, processing time, and errors. It was built to run in a memory/compute constrained environment, so we skipped some features such as parallel processing for tables.

After completion, the script creates an excel sheet in the format below and adds a result (PASS, FAIL, or ERROR).

  1. PASS indicates all the chunks (the entire dataset) is identical between the two databases.
  2. FAIL indicates that there was a chunk mismatch while comparing the data.
    Note: The data comparison stops as soon as there’s a chunk mismatch
  3. ERROR as the name suggests indicates that an error has occurred. The error message will be displayed in the console.
Excel file: Comparison Report for Each Table

Room For Improvement

“Software development is never complete”. We used this script (or different versions of it), for our use case. It is not designed to be universally applicable. Other things that can be improved in this code are:

  1. Adding parallel processing.
  2. Adding more robust data casting.
  3. The primary key fetching logic in the code relies on the assumption that SQL Server has the constraints correctly setup. If it’s not done yet, adding the same functionality through Oracle might end up being more robust.
  4. Database connection management can be improved.

Conclusion

Our chunk-based hashing approach proved to be an efficient and scalable solution, significantly reducing processing time while maintaining accuracy. By focusing on hashing chunks of data instead of performing row-by-row comparisons, we were able to handle gigabytes of data efficiently within a constrained environment. In essence, this approach highlights how innovative thinking and a deep understanding of both databases and programming principles can tackle even the most daunting data migration challenges. With careful customization, this script can evolve into a universal tool for data validation across various platforms.

About Us:

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has delivered substantial and complex projects for some of the largest organizations around the globe, and we’re quickly building a brand that is well-known for superior delivery.

Website: https://bi3technologies.com/

Follow us on,
LinkedIn: https://www.linkedin.com/company/bi3technologies
Instagram: https://www.instagram.com/bi3technologies/
Twitter: https://twitter.com/Bi3Technologies

--

--

BI3 Technologies
BI3 Technologies

Published in BI3 Technologies

Bi3 has been recognized for being one of the fastest-growing companies in Australia. Our team has delivered substantial and complex projects for some of the largest organizations around the globe and we’re quickly building a brand that is well known for superior delivery.

No responses yet