Voice Driven RAG

Plaban Nayak
The AI Forum
Published in
10 min readAug 27, 2024
Source:ideogram.ai

Introduction

In today’s fast-paced world, where information is at our fingertips, we often find ourselves searching for answers to complex questions. However, what if you could simply ask your device a question and receive a concise and accurate response, all through the power of your voice?

Enter Voice driven(Audio) RAG, an innovative system that combines the convenience of voice commands with the intelligence of question answering.

Here users interact with a knowledge base using voice commands. By leveraging the latest advancements in speech recognition and natural language processing, Audio RAG seamlessly converts your spoken questions into text, which is then used to retrieve relevant information from a vast knowledge repository.

How Audio RAG works

  1. Record Audio: The system starts by recording your voice using a microphone. Simply ask your question, and Audio RAG will capture your audio input.
  2. Convert Speech to Text: Once the audio is recorded, it is processed through a speech recognition engine. This powerful tool analyzes the acoustic properties of your voice and converts it into text format, preserving the meaning and context of your question.
  3. Perform Question Answering: With the transcribed text in hand, Audio RAG utilizes advanced language models to understand the meaning and intent behind your question. It then searches through a vast knowledge base, retrieving the most relevant information to provide you with a concise and accurate answer.
  4. Convert Text to Speech: The response synthesized by the LLM is then converted to audio files.

Technology Stack Used

  1. GROQ : It is a fast AI inference, powered by LPU™ AI inference technology which delivers fast, affordable, and energy efficient AI.
  2. gTTS : gTTS (Google Text-to-Speech) is a Python library and CLI tool that allows you to interface with Google Translate’s text-to-speech API. It enables you to convert text into spoken audio and save it as an MP3 file.
  3. pydub : pydub is a Python library that provides a simple and easy-to-use high-level interface for manipulating audio files. It allows you to perform various operations on audio data
  4. LangChain : Framework for large language model application development
  5. HuggingFace embedding model : Hugging Face is a prominent platform in the field of natural language processing (NLP) and artificial intelligence (AI), known for its extensive collection of pre-trained models and tools for building applications that utilize these models. One of the key functionalities offered by Hugging Face is the ability to generate embeddings, which are dense vector representations of text data. These embeddings can be used for a variety of tasks, including semantic search, text classification, and clustering.
  6. Streamlit : Streamlit is an open-source Python framework designed for building and sharing interactive data applications quickly and easily.
  7. ChromaDB : ChromaDB is an open-source vector database designed to facilitate the storage and retrieval of vector embeddings, making it particularly useful for applications involving large language models (LLMs) and semantic search.
Audio Enabled RAG

Code Implementation

Install required dependencies

!pip install langchain langchain_community langchain_groq chromadb sentence_transformers
!pip install -U langchain-huggingface
!pip install -U langchain-chroma
!pip install python-dotenv

Create .env file and setup your Groq Api Key

GROQ_API_KEY=<your api key>

Setup Groq API Key

import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

Setup the LLM

llm = ChatGroq(model_name="llama3-70b-8192",
temperature=0.1,
max_tokens=1000,
)

Setup the Embedding Model

from langchain_huggingface import HuggingFaceEmbeddings
model_name ="BAAI/bge-small-en-v1.5"
model_kwargs ={"device":"cpu"}
encode_kwargs ={"normalize_embeddings":False}
embeddings = HuggingFaceEmbeddings(model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs)

Setup the Text Splitter Helper Function

#text splitter
def text_splitter():
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=20,
length_function=len,
)
return text_splitter

Setup the RetrievalQA helper function

#RetrievalQA
def answer_question(question):
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
qa = RetrievalQA.from_chain_type(llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True)
result = qa.invoke({"query": question})
return result['result']
#

Helper Function to transcribe audio using Groq distil-whisper-large-v3-en

distil-whisper-large-v3-en : Distil-Whisper English is a distilled, or compressed, version of OpenAI’s Whisper model, designed to provide faster, lower cost English speech recognition while maintaining comparable accuracy.

  • Supported Language : English only
import os
from groq import Groq

# Initialize the Groq client
client = Groq()

# Specify the path to the audio file
filename = "/content/recorded_audio.wav"

def transcribe_audio(filename):
# Open the audio file
with open(filename, "rb") as file:
# Create a transcription of the audio file
transcription = client.audio.transcriptions.create(
file=(filename, file.read()), # Required audio file
model="distil-whisper-large-v3-en", # Required model to use for transcription
prompt="Specify context or spelling", # Optional
response_format="json", # Optional
language="en", # Optional
temperature=0.0 # Optional
)
# Print the transcription text
print(transcription.text)
return transcription.text

Helper Function to convert Text to speech

from gtts import gTTS
#
def text_to_audio(text):
# Convert text to speech
tts = gTTS(text=text, lang='en', slow=False)

# Save the audio as an MP3 file
mp3_file = "temp_audio.mp3"
tts.save(mp3_file)
#
return mp3_fi

Complete Code Implementation for the Streamlit Application.

Prepare a python script audio_rag.py

import streamlit as st
from time import sleep
#from st_audiorec import st_audiorec
from streamlit_mic_recorder import mic_recorder
from streamlit_chat import message
import os
from groq import Groq
from langchain_groq import ChatGroq
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyPDFLoader
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA
from chromadb.config import Settings
import chromadb
from gtts import gTTS
from pydub import AudioSegment
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
#
chroma_setting = Settings(anonymized_telemetry=False)
#
def newest(path):
files = os.listdir(path)
paths = [os.path.join(path, basename) for basename in files]
newest_file_path = max(paths, key=os.path.getctime)
return os.path.basename(newest_file_path)
#
def text_to_audio(text):
# Convert text to speech
tts = gTTS(text=text, lang='en', slow=False)

# Save the audio as an MP3 file
mp3_file = "temp_audio.mp3"
tts.save(mp3_file)

# # Convert MP3 to WAV
# audio = AudioSegment.from_mp3(mp3_file)
#audio.export(output_wav_file, format="wav")
return mp3_file
def save_uploaded_file(uploaded_file, directory):
try:
with open(os.path.join(directory, uploaded_file.name), "wb") as f:
f.write(uploaded_file.getbuffer())
return st.success(f"Saved file: {uploaded_file.name} to {directory}")
except Exception as e:
return st.error(f"Error saving file: {e}")
#Create a directory to save the uploaded files
upload_dir = "uploaded_files"
os.makedirs(upload_dir, exist_ok=True)
#
#Setup the LLM
#
llm = ChatGroq(model_name="llama3-70b-8192",
temperature=0.1,
max_tokens=1000,
)
#
#Setup the embedding Model
#
model_name ="BAAI/bge-small-en-v1.5"
model_kwargs ={"device":"cpu"}
encode_kwargs ={"normalize_embeddings":False}
embeddings = HuggingFaceBgeEmbeddings(model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs)
#
#
#Setup the text splitter
#
def text_splitter():
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=20,
length_function=len,
)
return text_splitter
#
#RetrievalQA
#
def answer_question(question,vectorstore):
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
qa = RetrievalQA.from_chain_type(llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True)
result = qa.invoke({"query": question})
return result['result']
#
# Initialize the Groq client
groq_client = Groq()

# Specify the path to the audio file
filename = "recorded_audio.wav"
# Helper Function to Transcribe Audio Recording
def transcribe_audio(filename):
# Open the audio file
with open(filename, "rb") as file:
# Create a transcription of the audio file
transcription = groq_client.audio.transcriptions.create(
file=(filename, file.read()), # Required audio file
model="distil-whisper-large-v3-en", # Required model to use for transcription
prompt="Specify context or spelling", # Optional
response_format="json", # Optional
language="en", # Optional
temperature=0.0 # Optional
)
# Print the transcription text
print(transcription.text)
return transcription.text

#Initialize a session state variable to track if the app should stop
if 'stop' not in st.session_state:
st.session_state.stop = False
#
# Set page configuration
st.set_page_config(
page_title="Audio and Book App",
page_icon="📚", # You can use an emoji or a URL to an image
layout="wide"
)
# Create two columns
col1, col2 = st.columns([1, 2]) # Adjust the ratios to control the width of the columns
#
with col1:
# Create a fancy title with icons
st.markdown(
"""
<h1 style='text-align: center;'>
🎧 Audio Enabled 📚 Knwoledge App
</h1>
<h5 style='text-align: center;'>
Your one-stop solution for audio enabled Question Answering System!
</h5>

""",
unsafe_allow_html=True
)
# Add additional content here
st.write("Welcome to the Audio Enabled RAG App!.")

st.image("audio.jpeg", caption="Audio Powered RAG",output_format="auto")


# Stop button to stop the process
if st.button("Stop Process"):
st.session_state.stop = True # Set the stop flag to True

# Display a message when the app is stopped
if st.session_state.stop:
st.write("The process has been stopped. You can refresh the page to restart.")

with col2 :

st.title("PDF Upload and Reader")
# Upload the PDF file
uploaded_file = st.file_uploader("Choose a PDF file", type="pdf")
#
# Setup the Vectorstore and Add Documents
#
persist_directory_path = "chromanew"
if uploaded_file is not None:
save_uploaded_file(uploaded_file, upload_dir)
file_name = uploaded_file.name
loader = PyPDFLoader(f"uploaded_files/{file_name}")
pages = loader.load_and_split(text_splitter())
persist_directory = persist_directory_path + "_" + file_name.split(".")[0]
if os.path.exists(persist_directory):
#

client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)
vectorstore = Chroma(embedding_function=embeddings,
client = client,
persist_directory=persist_directory,
collection_name=file_name.split(".")[0],
client_settings=chroma_setting,
)
#check if the vectorstore is loaded
print(f" The number of documents loaded in the vectorstore :{len(vectorstore.get()['documents'])}")

#st.disable_feature("pdf_uploader") # Disable the file
else:
client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)
vectorstore = Chroma(embedding_function=embeddings,
client = client,
persist_directory=persist_directory,
collection_name=file_name.split(".")[0],
client_settings=chroma_setting
)
#load documents into vectorstore
MAX_BATCH_SIZE = 100

for i in range(0,len(pages),MAX_BATCH_SIZE):
#print(f"start of processing: {i}")
i_end = min(len(pages),i+MAX_BATCH_SIZE)
#print(f"end of processing: {i_end}")
batch = pages[i:i_end]
#
vectorstore.add_documents(batch)
#
#check if the vectorstore is loaded
print(f" The number of documents loaded in the vectorstore :{len(vectorstore.get()['documents'])}")
#
# Initialize session state variable
if 'start_process' not in st.session_state:
st.session_state.start_process = False

# Create a button to start the process
if st.button("Start Process"):
st.session_state.start_process = True
# Main logic
if st.session_state.start_process:
options = os.listdir("uploaded_files")
none_list = ["none"]
options += none_list
# Create a selectbox for the user to choose an option
selected_option = st.selectbox("Select an option:", options)

#
if selected_option == "none":
file_name = newest("uploaded_files")
else:
file_name = selected_option
# Display the selected option
st.write(f"You selected: {selected_option}")
st.title("Audio Recorder- Ask Question based on the selected option")
# Step 1
with st.spinner("Audio Recording in progress..."):
# # Record audio
# wav_audio_data = st_audiorec()
# sleep(2)
# if wav_audio_data is not None:
# st.audio(wav_audio_data, format='audio/wav')

# Record audio
audio = mic_recorder(
start_prompt="Start recording",
stop_prompt="Stop recording",
just_once=False,
key='recorder'
)

if audio:
st.audio(audio['bytes'], format='audio/wav')
with open("recorded_audio.wav", "wb") as f:
f.write(audio['bytes'])
st.success("Audio Recording is completed!")

with st.spinner("Transcribing Audio in progress ..."):
text = transcribe_audio(filename)
transcription = text
st.markdown(text)
#
# Initialize chat history in session state
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
#
# Display chat messages from history
for i, chat in enumerate(st.session_state.chat_history):
message(chat["question"], is_user=True, key=f"question_{i}")
message(chat["response"], is_user=False, key=f"response_{i}")
#
if transcription :
with st.spinner("Syntesizing Response ....."):
#3 Perform RAG


print(f"File_name :{file_name}")
#
persist_directory_path = "chromanew"
persist_directory = persist_directory_path + "_" + file_name.split(".")[0]
client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)
vectorstore = Chroma(embedding_function=embeddings,
client = client,
persist_directory=persist_directory,
collection_name=file_name.split(".")[0],
client_settings=chroma_setting
)
response = answer_question(transcription,vectorstore)
st.success("Response Generated")
# st.title('Response :')
# st.write(response)
aud_file = text_to_audio(response)
#
# Add the question and response to chat history
st.session_state.chat_history.append({"question": transcription, "response": response})
# Display the question and response in the chat interface
message(transcription, is_user=True)
message(response, is_user=False)

# Play the audio after the response is generated
st.title("Audio Playback")
st.audio(aud_file, format='audio/wav', start_time=0)

Run the Streamlit application

streamlit run audio_rag.py

The Application Processing Snapshot

  1. Browse and upload PDF

2. Now that the vector store has been loaded press the Start Process Button

3 .Start Recording your question. Once the question is recorded the RAG process is triggered .The response along with the corresponding audio is displayed in the application screen.

4. Suppose you want to Q&A on already uploaded Documents then select the knowledge source selection box

5.Upload Another Document. In case you want to Q&A from the current document uploaded select the option as “none”.

6.Press the stop button to stop the process.

7. Refresh the screen to start afresh

Conclusion

In this article, we have explored the development of an Audio-Enabled Question Answering System (Audio RAG) using Python. Here I have covered the key steps involved, including recording audio, converting speech to text, and performing question answering using the RAG model. By combining these techniques, we can create a powerful system that allows users to interact with a knowledge base using voice commands.

Feel free to experiment with different libraries and models to further enhance the capabilities of the Audio RAG system. Happy coding!

References

connect with me

--

--