Comprehensive Data Analysis of a Large-Scale 12-Lead Electrocardiogram Database

Muhammad Wahid
11 min readMar 16, 2024

--

This article provides a comprehensive analysis of the “a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0” dataset, including detailed explanations of the code used for data extraction, analysis, and visualization.

Understanding the Dataset

The dataset comprises approximately 46,000 patient records, each containing 12-lead ECG data and associated metadata. The data is stored in .hea and .mat files, necessitating extraction and conversion to a more manageable format like CSV. Each lead captures 10 seconds of ECG data, sampled at 5000 data points per lead, enabling detailed visualization of the heart’s electrical activity over time.

Data Extraction

The extraction process involved two key steps:

1. Extracting ECG Signals:

This script utilizes the wfdb library to read the .hea and .mat files and extract the ECG signals and metadata. It then writes the extracted data to CSV files.

import os
import wfdb
import csv
from multiprocessing import Pool, freeze_support

# Function to process a single directory
def process_folder(directory, base_dir, result_base_dir):
# Create result directory
result_dir = os.path.join(result_base_dir, directory)
os.makedirs(result_dir, exist_ok=True)

# Path to the directory containing records
records_dir = os.path.join(base_dir, directory.replace('/', os.sep)) # Normalize path separator
records_path = os.path.join(records_dir, 'RECORDS')

# If RECORDS file exists, process the files listed
if os.path.exists(records_path):
# Read the list of file identifiers from the RECORDS file
with open(records_path, 'r') as records_file:
data_args_list = []
for line in records_file:
file_id = line.strip()
mat_path = os.path.join(base_dir, directory, file_id + '.mat').replace('/', os.sep)
hea_path = os.path.join(base_dir, directory, file_id).replace('/', os.sep)

# If .mat file exists, add to data arguments list
if os.path.exists(mat_path):
data_args_list.append((mat_path, hea_path, result_dir, file_id))

# Process files using multiprocessing
with Pool() as pool:
pool.map(process_file_data, data_args_list)

# Recursively process subdirectories
for sub_dir in os.listdir(records_dir):
sub_dir_path = os.path.join(records_dir, sub_dir)
if os.path.isdir(sub_dir_path):
process_folder(sub_dir, base_dir, result_base_dir)



# Function to process a single file
def process_file_data(data_args):
mat_path, hea_path, result_dir, file_id = data_args

# Read the WFDB file using rdsamp function
signals, meta = wfdb.rdsamp(hea_path)

# Access metadata information
sample_rate = meta['fs']
signal_length = meta['sig_len']
num_signals = meta['n_sig']
signal_names = meta['sig_name']
csv_path = os.path.join(result_dir, f"{file_id}.csv")
with open(csv_path, 'w', newline='') as csv_file:
writer = csv.writer(csv_file)

# Write the header row to the CSV file
header = ['time'] + signal_names
writer.writerow(header)

# Write the data to the CSV file
for ind in range(signal_length):
row = [ind / sample_rate] + [signals[ind][j] for j in range(num_signals)]
writer.writerow(row)

# Main block
if __name__ == '__main__':
freeze_support()
# Define records directory
records_directory = './a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0'

# Read subfolder paths from the RECORDS file
with open(os.path.join(records_directory, 'RECORDS'), 'r') as first_records_file:
subfolder_paths = first_records_file.read().splitlines()

# Iterate through subfolder paths and process each folder
for index, subfolder_path in enumerate(subfolder_paths):
process_folder(subfolder_path, records_directory, 'output')
print(index + 1) # Print index to track progress

Explanation:

  • The process_file_data function takes a tuple of arguments containing file paths and identifiers.
  • wfdb.rdsamp reads the ECG data and metadata from the .hea file.
  • Metadata like sample rate, signal length, and signal names are extracted.
  • A CSV file is created for each patient record, and the header row with time and signal names is written.
  • The script iterates through each data point, calculates the time stamp, and writes a row to the CSV with the time stamp and corresponding signal values.

2. Extracting Patient Metadata:

This script parses the metadata from all the .hea files, including patient ID, age, gender, and diagnoses, and stores it in a separate CSV file.

import os
import scipy.io
import matplotlib.pyplot as plt
import shutil
import wfdb
import csv
from multiprocessing import Pool, freeze_support


# Function to process a single file
def process_file(args):
mat_file_path, hea_file_path, patient_data = args

# Read the WFDB file using rdsamp function
signals, metadata = wfdb.rdsamp(hea_file_path)

# Access metadata information
comments = metadata['comments']

# Extract patient ID from file name
file_name = os.path.basename(hea_file_path)
patient_id = file_name.split('.')[0]

# Extract other relevant information from comments
attributes = {'Patient ID': patient_id}
for comment in comments:
if ':' in comment:
key, value = comment.split(':', 1)
attributes[key.strip()] = value.strip()

# Append data to patient_data dictionary
patient_data.append(attributes)

# Print status
print("Processed:", hea_file_path)


# Function to process a single directory
def process_directory(directory, base_dir, result_base_dir, patient_data):
result_dir = os.path.join(result_base_dir, directory)
os.makedirs(result_dir, exist_ok=True)

# Check if the directory contains a RECORDS file
records_filename = 'RECORDS'
records_dir = os.path.join(base_dir, directory.replace('/', os.sep)) # Normalize path separator
records_file_path = os.path.join(records_dir, records_filename)

if os.path.exists(records_file_path):
# Read the list of file names from the RECORDS file
with open(records_file_path, 'r') as records_file:
args_list = []
for line in records_file:
file_name = line.strip()
mat_file_path = os.path.join(base_dir, directory, file_name + '.mat').replace('/', os.sep)
hea_file_path = os.path.join(base_dir, directory, file_name).replace('/', os.sep)

# Check if both .mat and .hea files exist
if os.path.exists(mat_file_path):
args_list.append((mat_file_path, hea_file_path, patient_data))

# Process files
for args in args_list:
process_file(args)

# Recursively process subdirectories
for subdir in os.listdir(records_dir):
subdir_path = os.path.join(records_dir, subdir)
if os.path.isdir(subdir_path):
process_directory(subdir, base_dir, result_base_dir, patient_data)


if __name__ == '__main__':
# Ensure multiprocessing is supported on Windows
freeze_support()

# Directory containing the first RECORDS file (which contains paths to subfolders)
records_directory = './a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0'
first_records_filename = 'RECORDS'

# Directory to store processed results
result_directory = 'results'

# Initialize patient data list
patient_data = []

# Read the list of subfolder paths from the first RECORDS file
with open(os.path.join(records_directory, first_records_filename), 'r') as first_records_file:
subfolder_paths = first_records_file.read().splitlines()

# Iterate over each subfolder path from the first RECORDS file
for subfolder_path in subfolder_paths:
process_directory(subfolder_path, records_directory, result_directory, patient_data)

# Write patient data to CSV
csv_file_path = os.path.join(result_directory, "patient_records_new.csv")
with open(csv_file_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=patient_data[0].keys())
writer.writeheader()
writer.writerows(patient_data)

print("CSV file written successfully.")

Explanation:

  • The process_file function takes file paths and a list to store patient data.
  • wfdb.rdsamp reads the ECG data and metadata from the .hea file.
  • The script extracts patient ID from the file name and parses the comments section of the metadata to extract other relevant information like age, sex, and diagnoses.
  • The extracted information is stored in a dictionary and appended to the patient data list.

Data Analysis and Visualization

With the data in CSV format, the analysis phase begins:

1. Visualizing ECG Signals:

This script reads the CSV file containing the extracted ECG data and plots the signals on a timeline.

import matplotlib.pyplot as plt
import pandas as pd

# Function to plot stacked ECG signals
def plot_stacked_ecg_signals(data, space=2, title='ECG Signals'):
# Extract time and signal columns
time = data['time']
signals = data.drop(columns=['time'])

# Calculate the y-axis offsets for stacking
offsets = np.arange(len(signals.columns)) * space

# Plot each signal separately with vertical stacking
fig, ax = plt.subplots(figsize=(10, 6))
for idx, column in enumerate(signals.columns):
ax.plot(time, signals[column] + offsets[idx], label=column)

ax.set_xlabel('Time (elapsed)')
ax.set_ylabel('Stacked Signal (mV)')
ax.set_title(title)
ax.legend(loc='upper right')
ax.grid(True)
plt.show()

# Read data from CSV file
df = pd.read_csv('path/to/ecg_data.csv')

# Plot stacked ECG signals
plot_stacked_ecg_signals(df)

Explanation:

  • The plot_stacked_ecg_signals function takes the data frame containing ECG data and plots each lead with a vertical offset for better visualization.
  • The script reads the CSV file and calls the plotting function to visualize the ECG signals.

2. Dimensionality Reduction with PCA:

This script performs PCA on the ECG data to reduce its dimensionality while preserving the most important information.

import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import pandas as pd
import numpy as np
import seaborn as sns

# Function to plot stacked ECG signals
def plot_stacked_ecg_signals(data, space=2, title='ECG Signals'):
# Extract time and signal columns
time = data['time']
signals = data.drop(columns=['time'])

# Calculate the y-axis offsets for stacking
offsets = np.arange(len(signals.columns)) * space

# Plot each signal separately with vertical stacking
fig, ax = plt.subplots(figsize=(10, 6))
for idx, column in enumerate(signals.columns):
ax.plot(time, signals[column] + offsets[idx], label=column)

ax.set_xlabel('Time (elapsed)')
ax.set_ylabel('Stacked Signal (mV)')
ax.set_title(title)
ax.legend(loc='upper right')
ax.grid(True)

plt.show()

# Step 1: Read data from CSV file
df = pd.read_csv('Path to CSV file')

# Plot stacked ECG signals
plot_stacked_ecg_signals(df)

time = df['time']

# Step 2: Data Preparation
X = df.drop(columns=['time'])
plt.figure(figsize=(10, 6))
sns.boxplot(data=X)
plt.title('Box Plot of ECG Signals')
plt.xlabel('Leads Components')
plt.ylabel('Scaled Value')
plt.show()
# Step 3: Standardization
# Preprocessing the data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_scaled_df = pd.DataFrame(X_scaled, columns=X.columns)

# Step 4: Correlation Matrix and Heatmap
corr_matrix = X_scaled_df.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5)
plt.title('Correlogram of ECG Signals')
plt.show()

# Step 5: PCA Implementation
pca = PCA()
X_pca = pca.fit_transform(X_scaled)

# Step 6: Interpretation
explained_variance_ratio = pca.explained_variance_ratio_
print("Explained Variance Ratio:", explained_variance_ratio)

# Calculate the rate of change
threshold = 0.005
cumulative_variance_ratio = explained_variance_ratio.cumsum()

# Calculate the differences between consecutive values
differences = np.diff(cumulative_variance_ratio)

# Look for a point where the rate of change becomes small
plateau_index = np.argmax(differences < threshold)
print("Plateau Index:", plateau_index)

# Step 7: Plot the explained variance ratio
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(explained_variance_ratio) + 1), explained_variance_ratio.cumsum(), marker='o', linestyle='-')
plt.axvline(x=plateau_index, color='red', linestyle='--')
plt.title('Explained Variance Ratio')
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Explained Variance Ratio')
plt.grid(True)
plt.show()

# Step 8: Selecting the number of components for PCA
n_components = plateau_index
pca = PCA(n_components=n_components)

# Fit PCA to your data and transform it
pca_data = pca.fit_transform(X_scaled_df)
print(pca_data.shape)

# Create a DataFrame for the transformed data
pca_df = pd.DataFrame(data=pca_data, columns=[f'PC{i+1}' for i in range(n_components)])
corr_matrix = pca_df.corr()
pca_df['time'] = time


# Step 9: Plot stacked PCA ECG signals
plot_stacked_ecg_signals(pca_df, 4, "PCA ECG Signals")

# Step 10: Heatmap of the correlation matrix for PCA ECG Signals

plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5)
plt.title('Correlogram of PCA ECG Signals')
plt.show()

# Step 11: Box Plot
plt.figure(figsize=(10, 6))
sns.boxplot(data=pca_df.drop(columns=['time']))
plt.title('Box Plot of PCA ECG Signals')
plt.xlabel('Principal Components')
plt.ylabel('Scaled Value')
plt.show()

Explanation:

  • The script prepares the data by dropping the time column and standardizing the remaining columns.
  • It calculates the correlation matrix and displays it as a heatmap to visualize the relationships between the leads.
  • PCA is performed on the standardized data, and the explained variance ratio is calculated to determine the optimal number of principal components.
  • The data is transformed using the chosen number of components, and the transformed data is visualized using stacked plots, correlogram, and box plot.

Insights from Patient Data

This script analyzes the patient metadata to gain insights into diagnoses, demographics, and potential correlations.

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib_venn import venn3
from collections import Counter


# Read patient records from CSV file
patient_records_file = "./results/patient_records_new.csv"
df_patient_records = pd.read_csv(patient_records_file)

# Read condition names from CSV file
condition_names_file = "a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/ConditionNames_SNOMED-CT.csv"
df_condition_names = pd.read_csv(condition_names_file)

# Convert condition names DataFrame to a dictionary
condition_names_dict = df_condition_names.set_index('Snomed_CT')['Acronym Name'].to_dict()

def replace_with_names(dx_values):
dx_names = []
for dx in dx_values.split(','):
dx_name = condition_names_dict.get(int(dx))
if dx_name:
dx_names.append(dx_name)
else:
dx_names.append(dx)
return ', '.join(dx_names)

# Apply replacement function to 'Dx' column
df_patient_records['Dx'] = df_patient_records['Dx'].astype(str).apply(replace_with_names)

# Split multiple diagnoses and create a new DataFrame
diagnosis_df = df_patient_records['Dx'].str.split(',', expand=True).stack().reset_index(level=1, drop=True).to_frame(name='Diagnosis')

# Count occurrences of each diagnosis
diagnosis_counts = Counter(diagnosis_df['Diagnosis'])

# Get top 3 diagnoses
top_diagnoses = diagnosis_counts.most_common(3)

# Extract top 3 diagnosis names
top_diagnosis_names = [diagnosis[0] for diagnosis in top_diagnoses]

# Create sets for patients with each top diagnosis
patients_with_top_diagnoses = [
set(df_patient_records[df_patient_records['Dx'].str.contains(diagnosis)]['Patient ID'])

for diagnosis in top_diagnosis_names
]


# Create the Venn diagram
plt.figure(figsize=(10, 8))
venn3(patients_with_top_diagnoses, set_labels=top_diagnosis_names)
plt.title('Top 3 Diagnoses Comparison')
plt.show()




# Diagnostic insights
diagnoses = df_patient_records['Dx'].str.split(',', expand=True).stack().value_counts()

diagnoses = diagnoses.rename(lambda x: condition_names_dict.get(x, x))
plt.figure(figsize=(10, 6))
diagnoses.head(10).plot(kind='barh', color='lightgreen')
plt.title('Top 10 Most Common Diagnoses')
plt.xlabel('Frequency')
plt.ylabel('Diagnosis')
plt.gca().invert_yaxis()
plt.show()

#correlation matrix
# Extract top 10 diagnoses
top_10_diagnoses = diagnoses.head(10).index

# Create a DataFrame to store binary indicators for presence of each top diagnosis
diagnosis_presence = pd.DataFrame(index=df_patient_records.index, columns=top_10_diagnoses)

# Fill the DataFrame with binary indicators
for diagnosis in top_10_diagnoses:
diagnosis_presence[diagnosis] = df_patient_records['Dx'].str.contains(diagnosis).astype(int)

# Calculate the correlation matrix
correlation_matrix = diagnosis_presence.corr()

# Plot the correlation matrix as a heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f")
plt.title('Correlation between Top 10 Diagnoses')
plt.show()


# Count occurrences of each number of diseases
diagnosis_counts = df_patient_records['Dx'].str.split(',').apply(len)
diagnosis_counts_grouped = diagnosis_counts.apply(lambda x: min(x, 4)).value_counts().sort_index()
more_than_four_count = (diagnosis_counts > 4).sum()

# Add a category for individuals with more than four diseases
if more_than_four_count > 0:
diagnosis_counts_grouped[4] += more_than_four_count
diagnosis_counts_grouped = diagnosis_counts_grouped.drop(5, errors='ignore')

# Create a bar plot
plt.figure(figsize=(10, 6))
plt.bar(diagnosis_counts_grouped.index, diagnosis_counts_grouped.values, color='skyblue')
plt.xlabel('Number of Diseases')
plt.ylabel('Number of Persons')
plt.title('Distribution of Persons by Number of Diseases')
plt.xticks(diagnosis_counts_grouped.index, [f"{x} or more diseases" if x == 4 else x for x in diagnosis_counts_grouped.index])
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()



gender_counts = df_patient_records['Sex'].value_counts()

# Create a pie chart
plt.figure(figsize=(8, 6))
plt.pie(gender_counts, labels=gender_counts.index, autopct='%1.1f%%', colors=['lightblue', 'lightcoral'], startangle=90)

# Draw a white circle at the center to make it a donut plot
centre_circle = plt.Circle((0, 0), 0.70, color='white', linewidth=0)
fig = plt.gcf()
fig.gca().add_artist(centre_circle)

# Equal aspect ratio ensures that pie is drawn as a circle
plt.axis('equal')
plt.title('Distribution of Patients by Gender')
plt.show()



# Define age ranges and corresponding categories
age_ranges = [(0, 12), (13, 19), (20, 39), (40, 59), (60, 79), (80, 100)]
categories = ['0-12', '13-19', '20-39', '40-59', '60-79', '80-100']

# Categorize ages into groups
def categorize_age(age):
for i, (start, end) in enumerate(age_ranges):
if start <= age <= end:
return categories[i]

df_patient_records['Age Group'] = df_patient_records['Age'].apply(categorize_age)



# Count the number of patients in each age group
age_group_counts = df_patient_records['Age Group'].value_counts()
age_group_counts = age_group_counts.reindex(categories)

# Plot the bar plot
plt.figure(figsize=(10, 6))
age_group_counts.plot(kind='bar', color='skyblue')
plt.title('Distribution of Patients by Age Group')
plt.xlabel('Age Group')
plt.ylabel('Number of Patients')
plt.xticks(rotation=45)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()


# Count occurrences of each number of diseases for each gender and age group
# Scatter plot
age_bins = [0, 20, 40, 60, 80, 100]
age_labels = ['0-20', '21-40', '41-60', '61-80', '81-100']
df_patient_records['Age_Group'] = pd.cut(df_patient_records['Age'], bins=age_bins, labels=age_labels)

# Group by gender and age group, count the number of patients
grouped = df_patient_records.groupby(['Age_Group', 'Sex']).size().unstack(fill_value=0)

# Plotting
grouped.plot(kind='bar', figsize=(10, 6))
plt.xlabel('Age Group')
plt.ylabel('Number of Patients')
plt.title('Number of Patients by Gender and Age Group')
plt.xticks(rotation=0) # Rotate x-axis labels for better readability
plt.legend(title='Gender')
plt.show()

Explanation:

  • The script reads the patient records CSV file and replaces diagnosis codes with their corresponding names using a dictionary.
  • It creates a new data frame with each row representing a single diagnosis for a patient.
  • The script analyzes the diagnosis data to identify the top diagnoses, visualize their co-occurrence using a Venn diagram, and plot the frequency of the top diagnoses.
  • It calculates and visualizes the correlation matrix of the top diagnoses to explore potential relationships.
  • The script analyzes and visualizes the distribution of patients based on the number of diseases, gender, and age.

This comprehensive analysis, along with the detailed code explanations, provides a thorough understanding of the ECG database and the insights it holds. The extracted information and visualizations can be used for further research and development of algorithms for cardiovascular disease diagnosis and risk assessment.

--

--