Diarising Audio Transcriptions with Python and Whisper: A Step-by-Step Guide
In this tutorial we will transcribe audio to get a file output that will annotate an API with transcriptions based on the SPEAKER here is an example:
SPEAKER_06 --> Yep, that's still a lot of work to do here.
SPEAKER_02 --> That's correct.
SPEAKER_07 --> We are at time, should we schedule another session?
SPEAKER_00 --> Yes please.
SPEAKER_05 --> Thanks for your time.
SPEAKER_02 --> Thanks for your time.
You can then simply replace the SPEAKER_NN so you’ll get something like:
ROHIT --> Yep, that's still a lot of work to do here.
AMY --> That's correct.
MICHELLE --> We are at time, should we schedule another session?
ANDREW --> Yes please.
CAROLINE --> Thanks for your time.
AMY --> Thanks for your time.
Create a wave file from the Video file
To do this we will execute the following code, that uses environment variables with ffmpeg to create the wav file.
Note you may need to install ffmpeg directly.
# Define the file path to the Video
import os
import random
import string
file_path = "C:/Users/14242/Videos/my_ video.mp4"
# remove the .mp4 extension from the file path and add a random string of 8 characters to the end of the file path
file_path_audio = file_path[:-4] + "_" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) + ".wav"
# Export both to os environment
os.environ["file_path"] = file_path
os.environ["file_path_audio"] = file_path_audio
# set the file path by exporting the file path to the os environment using unix
!echo "file_path: %file_path%"
!echo "file_path_audio: %file_path_audio%"
# Convert the video file_path to an audio mp4 file and be verbose
!ffmpeg -i "%file_path%" -f wav -bitexact -acodec pcm_s16le -ar 22050 -ac 1 "%file_path_audio%"
Create a model for speaker diarization
Next we’ll create a model for speaker diarization via pyannote.
# Get the data from the model for speaker diarization
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization')
# Get the audio file from the file path
diarization_file = file_path_audio + "_diarization.txt"
# If the file exists, then read the file and return the data
if os.path.exists(diarization_file):
with open(diarization_file, "r") as text_file:
dz = str(text_file.read())
else:
# If the file does not exist, then get the data from the model
AUDIO_FILE = {'uri': 'blabla', 'audio': file_path_audio}
dz = pipeline(AUDIO_FILE)
# Use the file path to save the file to a text file with {filepath}_diarization.txt
with open(diarization_file, "w") as text_file:
text_file.write(str(dz))
Next we’ll write a function that takes a timestring and convert into an integer in millseconds.
def millisec(timeStr):
spl = timeStr.split(":")
s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000)
return s
Now we we’ll spilit the diarization file into groups.
# Split into Groups
import re
# open the diarization file
# check that the variable diarization_file exists
dzs = open(diarization_file).read().splitlines()
groups = []
g = []
lastend = 0
for d in dzs:
if g and (g[0].split()[-1] != d.split()[-1]): #same speaker
groups.append(g)
g = []
g.append(d)
end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=d)[1]
end = millisec(end)
if (lastend > end): #segment engulfed by a previous segment
groups.append(g)
g = []
else:
lastend = end
if g:
groups.append(g)
print(*groups, sep='\n')
Split Audio File into Groups
Next we’ll need to split the audio file based on the groups in the directory tmp
.
# Split the audio file into the groups
from pydub import AudioSegment
import shutil
# Delete the tmp folder if it exists
if os.path.exists('tmp'):
shutil.rmtree('tmp')
audio = AudioSegment.from_wav(file_path_audio)
gidx = -1
for g in groups:
start = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[-1])[1]
start = millisec(start) #- spacermilli
end = millisec(end) #- spacermilli
print(start, end)
gidx += 1
# export to tmp folder
# create tmp folder if it doesn't exist
if not os.path.exists('tmp'):
os.makedirs('tmp')
audio[start:end].export('tmp/' + str(gidx) + '.wav', format='wav')
Transcribe the audio files from the groups
Now let’s go ahead and transcribe the audio files from those groups.
# Transcribe the audio files from the tmp folder
import os
from tqdm import tqdm
# Check the tmp folder exists and has wav files
if os.path.exists('tmp'):
if len(os.listdir('tmp')) > 0:
for i in tqdm(range(gidx+1)):
file_name = 'tmp/' + str(i) + '.wav'
output_file_name = str(i) + '.txt'
!whisper {file_name} --language en --model medium --device cuda --output_dir tmp
else:
print("No files in tmp folder")
Output the Transcription with Diarisation
Next we’ll create a transcription file with the speakers.
# Create a text file with the combined transcriptions and calculate the % of time each speaker spoke
import re
import webvtt
from datetime import timedelta
transcribed_file = file_path_audio + "_transcribed.txt"
print(transcribed_file)
with open(transcribed_file, 'w') as f:
gidx = -1
for g in groups:
shift = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
shift = millisec(shift) - 0 #the start time in the original video
shift=max(shift, 0)
speaker_time = {}
gidx += 1
captions = [[(int)(millisec(caption.start)), (int)(millisec(caption.end)), caption.text] for caption in webvtt.read('tmp/' + str(gidx) + '.wav.vtt')]
if captions:
for c in captions:
speaker = g[0].split()[-1]
start = shift + c[0]
start = start / 1000.0 #time resolution ot youtube is Second.
startStr = '{0:02d}:{1:02d}:{2:02.2f}'.format((int)(start // 3600),
(int)(start % 3600 // 60),
start % 60)
end = shift + c[1]
end = end / 1000.0
endStr = '{0:02d}:{1:02d}:{2:02.2f}'.format((int)(end // 3600),
(int)(end % 3600 // 60),
end % 60)
text = c[2]
# add a line formatted with the start and end time, speaker and text and a new line at the end
f.write(speaker + ' --> ' + text + ' \n')
Combine Sentences for Easy Reading
Let’s now go ahead and createa combined sentences file so that each speaker represents a single line.
# Combine to a sentence where the same speaker speaks in a row
import re
# open the transcribed file
# check that the variable transcribed_file exists
transcribed_file = file_path_audio + "_transcribed.txt"
transcribed_file_combined = file_path_audio + "_transcribed_combined.txt"
with open(transcribed_file_combined, "w") as text_file:
with open(transcribed_file, 'r') as f:
lines = f.readlines()
last_speaker = ""
last_text = ""
for line in lines:
# split the line into speaker, text and time
speaker, text = line.split(" --> ")
# if the speaker is the same as the last speaker, then add the text to the last text and remove the new line
if speaker == last_speaker:
last_text += text.replace(" \n", " ")
# if the speaker is not the same as the last speaker, then write the last speaker and text to the file
else:
if last_speaker != "":
text_file.write(last_speaker + " --> " + last_text + " \n")
last_speaker = speaker
last_text = text.replace(" \n", " ")
# write the last speaker and text to the file
text_file.write(last_speaker + " --> " + last_text + " \n")
Calculating % of time each speaker spoke
You can also calculate the % of time each speaker spoke as follows:
# Calculate the % of time each speaker spoke using the groups
import re
import webvtt
from datetime import timedelta
gidx = -1
speaker_time = {}
for g in groups:
shift = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
shift = millisec(shift) - 0 #the start time in the original video
shift=max(shift, 0)
gidx += 1
captions = [[(int)(millisec(caption.start)), (int)(millisec(caption.end)), caption.text] for caption in webvtt.read('tmp/' + str(gidx) + '.wav.vtt')]
if captions:
for c in captions:
speaker = g[0].split()[-1]
start = shift + c[0]
end = shift + c[1]
if speaker in speaker_time:
speaker_time[speaker] += end - start
else:
speaker_time[speaker] = end - start
total_time = sum(speaker_time.values())
for speaker in speaker_time:
# round the percentage as a integer
print(speaker, round(speaker_time[speaker]/total_time*100, 0), "%")
Hopefully this guide gives you some pointers on how to transcribe with diarisation.