Speaker diarization using Whisper ASR and Pyannote

Ritesh
8 min readJul 22, 2023
Speaker Diarization (source: Adobe Firefly)

What is speaker diarization?

Speaker diarization is the process of automatically segmenting and identifying different speakers in an audio recording. The goal of speaker diarization is to partition the audio stream into homogeneous segments, where each segment corresponds to a specific speaker or speaker turn. In other words, it aims to answer the question, “Who spoke when?” throughout the audio recording.

What is speaker embedding?

Speaker embedding is a compact numerical representation of a speaker’s voice or speech characteristics. It is a fixed-size vector that captures essential information about a speaker’s unique vocal traits, speaking style, and other speech-related characteristics. The speaker embedding is learned through a deep learning model that is trained on a large amount of labeled speech data, enabling the model to extract meaningful features that are specific to each speaker.

Speaker diarization finds applications in several areas, including audio transcription, voice recognition, speaker identification, and speech analysis. The process of identifying and segregating individual speakers facilitates the analysis and handling of audio data on a per-speaker level. Personally, I am motivated to incorporate speaker diarization as a component in my capstone project, and I would like to share the knowledge and insights I have gained throughout the development of my capstone project.

The Crucial Role of Speaker Embeddings

If speaker embeddings are not generated properly or if there are issues with the embedding process, it can lead to several potential problems in speaker diarization or any downstream tasks. Here are some common issues that may arise if embeddings are not done correctly:

  1. Poor Clustering: In speaker diarization, the primary objective is to group speech segments belonging to the same speaker. If embeddings are not representative of speaker characteristics, clustering algorithms may fail to accurately group segments, resulting in misattributed speakers and degraded diarization performance.
  2. Speaker Overlaps: When speaker overlaps occur (multiple speakers talking simultaneously), accurate embeddings are crucial for distinguishing and separating overlapping speech segments into correct speaker clusters. Incorrect embeddings can lead to incorrect speaker separations and incorrect timing boundaries.
  3. Misclassification of Speakers: Incorrect embeddings may lead to misclassification of speakers, where segments of one speaker are mistakenly assigned to another speaker, leading to inaccurate speaker labels and potentially affecting downstream analysis.
  4. Insufficient Dimensionality: If the embedding dimensionality is too low, the embeddings may lack the necessary information to represent complex speaker characteristics, resulting in poor separation of speakers.
  5. Computational Overhead: Inefficient embeddings or models may lead to increased computational overhead, making the diarization process slow and impractical for real-time applications.
  6. Sensitivity to Noise and Variability: If the embeddings are not robust to variations in speaking styles, noise, or recording conditions, the diarization system’s performance may degrade significantly.
  7. Lack of Generalization: Embeddings learned from one dataset may not generalize well to a different dataset with different speakers and speaking conditions. A lack of generalization can limit the diarization system’s applicability to various scenarios.

Behind the Scenes: The Working of Speaker Diarization System

The fundamental process of building a speaker diarization system involves the following steps:

  1. Segmentation: The audio stream is divided into smaller segments based on characteristics like silence, pauses, and changes in speaker turns.
  2. Speaker Embedding: For each segment, features or embeddings are extracted to represent the speaker’s speech characteristics. These embeddings serve as unique representations of each speaker’s voice.
  3. Clustering: The extracted speaker embeddings are clustered based on similarity, with segments from the same speaker forming a single cluster. This process groups segments that likely belong to the same speaker.
  4. Labeling: Once the clustering is complete, each segment is assigned a label representing the speaker's identity. Segments in the same cluster are assigned the same label, indicating they belong to the same speaker.

Implementation

Now that we have a clear understanding of speaker diarization and its various applications, let’s proceed to the implementation phase. For this task, we will utilize Whisper ASR, pyannote, and Agglomerative Clustering to achieve our objectives.

For those solely interested in the code, the Jupyter Notebook file can be accessed on my GitHub repository.

Our model requires two inputs:

  1. The number of speakers present in the audio.
  2. Audio file in WAV format.

Let’s start by importing the audio file for which we will conduct the diarization process.

# upload audio file
from google.colab import files
uploaded = files.upload()
path = next(iter(uploaded))

The below code setup configures parameters for a speaker diarization task:

  1. num_speakers = 2: This line defines the number of speakers expected to be present in the audio. It is set as a default value of 2 but can be modified as needed.
  2. language = ‘English’: This line defines the language of the audio. The available options are ‘any’ or ‘English’. It is used to choose the appropriate model for the diarization task.
  3. model_size = ‘large’: This line defines the size of the model to be used. The available options are ‘tiny’, ‘base’, ‘small’, ‘medium’, and ‘large’.
num_speakers = 2 #@param {type:"integer"}

language = 'English' #@param ['any', 'English']

model_size = 'large' #@param ['tiny', 'base', 'small', 'medium', 'large']

model_name = model_size
if language == 'English' and model_size != 'large':
model_name += '.en'py

Now let’s import all the necessary libraries

!pip install -q git+https://github.com/openai/whisper.git > /dev/null
!pip install -q git+https://github.com/pyannote/pyannote-audio > /dev/null

import whisper
import datetime

import subprocess

import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
embedding_model = PretrainedSpeakerEmbedding(
"speechbrain/spkrec-ecapa-voxceleb",
device=torch.device("cuda"))

from pyannote.audio import Audio
from pyannote.core import Segment

import wave
import contextlib

from sklearn.cluster import AgglomerativeClustering
import numpy as np

Since our model requires the input audio file in the .wav format, we will first check the audio file type, and if necessary, we will convert the audio file to the .wav format. This ensures that the audio is in the correct format for further processing with our model.

if path[-3:] != 'wav':
subprocess.call(['ffmpeg', '-i', path, 'audio.wav', '-y'])
path = 'audio.wav'

Now we will proceed with automatic speech recognition (ASR) using the Whisper library. We will load a pre-trained model of a specific size, allowing us to extract time-stamped segments from the audio and obtain their corresponding transcribed text. This ASR process enables us to convert speech data into text representations, which will be crucial for further analysis and processing.


model = whisper.load_model(model_size)
result = model.transcribe(path)
segments = result["segments"]

The provided code snippet opens an audio file in WAV format and extracts essential information, including the total number of frames and the frame rate. Using this information, the code calculates the duration of the audio file in seconds. These details serve as crucial parameters for various audio processing tasks, such as segmenting the audio or determining the audio’s length for further analysis.

with contextlib.closing(wave.open(path,'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / float(rate)

Next, we will introduce a function called ‘segment_embedding,’ which will handle the extraction of speaker embeddings for a given segment of the audio file. This function will crop the audio data within the segment, pass it through the pre-trained speaker embedding model, and finally, provide us with the relevant speaker embeddings. This function plays a crucial role in obtaining embeddings for each segment during the speaker diarization process, allowing us to effectively represent and distinguish different speakers in the audio.

audio = Audio()

def segment_embedding(segment):
start = segment["start"]
# Whisper overshoots the end timestamp in the last segment
end = min(duration, segment["end"])
clip = Segment(start, end)
waveform, sample_rate = audio.crop(path, clip)
return embedding_model(waveform[None])py

Now, we come to the crucial step of generating speaker embeddings for each segment obtained during the speaker diarization process. These embeddings will be stored in the ‘embeddings’ array. Afterward, we process the array to address any potential NaN values, ensuring that it contains valid numerical data for the subsequent steps in the speaker diarization pipeline. This process is essential for accurately representing speaker characteristics and facilitating the clustering and labeling of speakers.

embeddings = np.zeros(shape=(len(segments), 192))
for i, segment in enumerate(segments):
embeddings[i] = segment_embedding(segment)

embeddings = np.nan_to_num(embeddings)

Next, we will apply clustering to our embedded data. After executing the following code, each segment in the ‘segments’ list will be enriched with an extra key-value pair representing the speaker identity for that segment. The ‘labels’ array will contain the speaker identifiers assigned by the Agglomerative Clustering algorithm, which is based on the similarity of embeddings. Using this information, we will label each segment with the corresponding speaker ID, effectively completing the speaker diarization process.

clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

We will record and store all speaker labels along with their respective timestamps in the ‘transcript.txt’ file. Additionally, we will add the transcribed text for each segment, ensuring that the transcript file contains all relevant information about the speaker identities and speech content.


def time(secs):
return datetime.timedelta(seconds=round(secs))

f = open("transcript.txt", "w")
for (i, segment) in enumerate(segments):
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
f.write("\n" + segment["speaker"] + ' ' + str(time(segment["start"])) + '\n')
f.write(segment["text"][1:] + ' ')
f.close()

To visualize the clusters resulting from the diarization process, we can use matplotlib to create a plot. To achieve this, we perform PCA (Principal Component Analysis) with n_components=2 to reduce the embedding's dimensionality for better visualization.

import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Your existing code for clustering and labeling segments...

# Perform PCA to reduce the dimensionality of embeddings to 2D
pca = PCA(n_components=2, random_state=42)
embeddings_2d = pca.fit_transform(embeddings)

# Plot the clusters
plt.figure(figsize=(10, 8))
for i, segment in enumerate(segments):
speaker_id = labels[i] + 1
x, y = embeddings_2d[i]
plt.scatter(x, y, label=f'SPEAKER {speaker_id}')

plt.title("Speaker Diarization Clusters (PCA Visualization)")
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.legend()
plt.show()

Using Plotly, we have the option to create 3D visualizations for plots, which can be beneficial when dealing with a large number of clusters. With this approach, it becomes easier to visualize and understand the data distribution in a 3D space when there are a significant number of clusters involved.

import numpy as np
import plotly.graph_objects as go
from sklearn.decomposition import PCA
import matplotlib.cm as cm

# Your existing code for clustering and labeling segments...

# Perform PCA to reduce the dimensionality of embeddings to 3D
pca = PCA(n_components=3, random_state=42)
embeddings_3d = pca.fit_transform(embeddings)

# Get the number of unique speakers from the labels
num_unique_speakers = len(np.unique(labels))

# Create a colormap for speakers, ensuring each speaker gets a unique color
colors = cm.tab20b(np.linspace(0, 1, num_unique_speakers))

# Prepare the data for the 3D scatter plot
data = []
for i, segment in enumerate(segments):
speaker_id = labels[i] + 1
x, y, z = embeddings_3d[i]
color = colors[labels[i] % num_unique_speakers] # Get the corresponding color for the speaker
trace = go.Scatter3d(x=[x], y=[y], z=[z], mode='markers',
marker=dict(size=5, color=color),
name=f'SPEAKER {speaker_id}')
data.append(trace)

# Layout for the 3D scatter plot
layout = go.Layout(
title="Speaker Diarization Clusters (3D Visualization)",
scene=dict(
xaxis_title="Principal Component 1",
yaxis_title="Principal Component 2",
zaxis_title="Principal Component 3"
)
)

# Create the figure and plot the 3D scatter plot
fig = go.Figure(data=data, layout=layout)
fig.show()

The accuracy of the speaker diarization and PCA visualization relies on several factors, such as the audio’s quality and duration, the efficacy of the pre-trained embedding model, and the selection of the number of speakers for clustering. The outcome of the diarization and visualization can fluctuate based on the unique characteristics of the audio data and the performance of the GAN and clustering algorithms.

The code implementation in this blog article was completed by Dwarkesh Patel

--

--