Building an Enterprise Grade RAG System for Agentic AI-Part 2

Dream AI
16 min readDec 17, 2024

--

Author: Ahmad Wali Bin Saeed at DreamAI Software (Pvt) Ltd

Introduction

This is part 2 of a series of blog posts in which we will develop a sophisticated, enterprise grade RAG system step by step. We will add features progressively. In part 1, we developed a basic feature rich RAG system using LanceDB and Instructor for structured LLM outputs to serve as the foundation for this series. In part 2 (this article), we will build upon that to improve our RAG system using some of the improvements that we suggested at the end of the last part. In particular, in this part, we will describe the following:

  • Expand the database to a wider base of data sources other than only PDFs.
  • Use a smarter chunking strategy that uses both the structure and semantics of text, leading to more accurate and relevant information retrieval.
  • Use Chain-of-Thought (CoT) prompting to guide the LLM to reason through problems step-by-step, leading to more accurate and explainable responses.
  • Define validators to ensure the accuracy, reliability, and ethical operation of our RAG system, building user trust and confidence.

Expanding The Database

In part 1, we showed how to parse PDFs and extract markdown using PyMuPDF4LLM. In this part, we will show how to do the same for Word documents (.docx), image files (.png, .jpg) and websites.

Reading From The Web

In this section, we’ll see how to extract content from websites and convert it into markdown format. This allows us to ingest information from various online sources, including news articles, blog posts, and documentation.

The code snippet shown below utilizes the httpx library to fetch the HTML content of a given URL. It then employs lxml for parsing the HTML structure and extracting the body text. The HTML2Text class of the html2text library is used to transform the HTML into Markdown, ensuring that formatting like headings, lists, and code blocks are preserved.

We also clean the extracted markdown by removing unnecessary links and ensuring that each line of text meets a minimum length requirement. This helps to maintain a clean and consistent structure for the ingested content ready to be used for further analysis and retrieval.

import re
from textwrap import dedent

import httpx
import lxml
from html2text import HTML2Text
from lxml.html.clean import Cleaner

from IPython.display import display, Markdown # For displaying the markdown

def remove_links_from_md(md: str) -> str:
md = re.sub(r"!?\[([^\]]+)\]\([^\)]+\)", r"\1", md)
md = re.sub(r"!?\[([^\]]+)\]\[[^\]]+\]", r"\1", md)
md = re.sub(r"^\[[^\]]+\]:\s*http[s]?://\S+\s*$", "", md, flags=re.MULTILINE)
md = re.sub(r"!?\[]\([^\)]+\)", "", md)
return md

def clean_web_content(content: str, min_length: int = 3) -> str:
cleaned_content = remove_links_from_md(md=content)
return "\n".join([line for line in cleaned_content.split("\n") if len(line.strip()) > min_length])

def get_url_body(url: str, headers: dict | None = None) -> str:
body = lxml.html.fromstring(httpx.get(url, headers=headers).text).xpath("//body")[0] # type: ignore
body = Cleaner(javascript=True, style=True).clean_html(body)
return "".join(lxml.html.tostring(c, encoding="unicode") for c in body) # type: ignore


def url_body_to_md(body: str) -> str:
h2t = HTML2Text(bodywidth=5000)
h2t.ignore_links = True
h2t.mark_code = True
h2t.ignore_images = True
res = h2t.handle(body)

def _f(m):
return f"```\n{dedent(m.group(1))}\n```"

return re.sub(r"\[code]\s*\n(.*?)\n\[/code]", _f, res or "", flags=re.DOTALL).strip()

def urls_to_md(
urls: list[str] | str,
headers: dict | None = {"User-Agent": "Mozilla/5.0 (Company info@company.com)"},
clean_content: bool = True,
) -> dict[str, str]:
if isinstance(urls, str):
urls = [urls]
url_mds = {}
for url in urls:
md = url_body_to_md(body=get_url_body(url, headers=headers))
url_mds[url] = clean_web_content(content=md) if clean_content else md
return url_mds

# Read text from the first part of this series and display the introduction
url = "https://medium.com/@dreamai/building-an-enterprise-grade-rag-system-for-agentic-ai-part-1-e7af4296b1ab"
content = urls_to_md(url)[url]

display(Markdown(content[34:1630]))

Reading Word Documents

There is a mountain of information locked away in Word documents. The docx_to_md function shown below, using the mammoth library, unlocks that information by converting your .docx files into HTML. Then, it uses the url_body_to_md function defined earlier, to transform that HTML into clean, readable markdown text which can then be uploaded in the vector store and made available for querying in our RAG system.

import mammoth

def docx_to_md(file: str | Path) -> str:
html = ""
try:
with Path(file).open("rb") as docx_file:
html = mammoth.convert_to_html(docx_file).value
except Exception:
logger.exception(f"Could not convert {file} to html.")
md = html
if html:
try:
md = url_body_to_md(body=html)
except Exception:
logger.exception(f"Could not convert {file} to markdown.")
return md

Reading From Images

To extract the text from image files, we use pymupdf’s Pixmap class to read the image and its pdfocr_tobytes method, which uses Tesseract OCR as its underlying engine, to perform OCR and convert the OCR output to a PDF. The bytes type output from this method is then read by pymupdf.open to load the resulting PDF and read all the text.

import pymupdf
from pymupdf import Pixmap


def extract_text_from_image(file: str | Path, min_len: int = 2) -> str:
pmap = Pixmap(str(file))
doc = pymupdf.open("pdf", pmap.pdfocr_tobytes())
res = "".join([page.get_text() for page in doc.pages()])
return res if len(res) > min_len else ""

Improving The Chunking Strategy

Recall that in the last part, we used a very simple chunking strategy involving splitting the text into (nearly) equal length chunks while ensuring that splitting the text does not split/break any words. We also highlighted that the best chunking strategy is one that takes both structure and semantics of the text into account which is what we do here.

import os

from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics.pairwise import cosine_similarity

class MarkdownSemanticChunker:
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
max_chunk_size: int = 4000,
min_chunk_size: int = 100,
similarity_threshold: float = 0.5,
):
"""
A chunker that splits input text using major markdown separators i.e. headings, code blocks & page breaks
It then merges these chunks based on semantic similarity while ensuring that each chunk > min_chunk_size and < max_chunk_size

Args:
model_name: Embedding model to use
max_chunk_size: Maximum tokens per merged chunk
min_chunk_size: Minimum tokens per chunk before merging
similarity_threshold: Threshold for clustering similarity
"""
self.embedding_model: SentenceTransformer = SentenceTransformer(
model_name,
cache_folder=os.environ.get("MODELS_CACHE_FOLDER", "../../../models_cache"),
)
self.max_chunk_size = max_chunk_size
self.min_chunk_size = min_chunk_size
self.similarity_threshold = similarity_threshold

def compute_embeddings(self, text_list: List[str]) -> np.ndarray:
"""
Compute embeddings for a list of texts

Args:
text_list: List of text segments

Returns:
Numpy array of embeddings
"""
return np.array(
[
self.embedding_model.encode(text.strip(), show_progress_bar=False)
for text in text_list
]
)

def split_markdown(self, markdown_text: str) -> List[str]:
"""
Split markdown text based on various markdown separators

Args:
markdown_text: Input markdown text

Returns:
List of markdown chunks
"""
# Split based on various markdown elements
# Priority of splitting (from most to least significant)
separators = [
# Code blocks
r"(```[\s\S]*?```)",
# Headings (h1-h6)
r"(^#{1,6} .*$)",
# Horizontal rules
r"(^---+$)",
# Bold Single-line headings
r"(^\*\*.*?\*\*$)",
r"(^__.*?__$)",

# Paragraphs (split by double newline)
# r'(\n\n)'
]

# Combine separators into a single regex
split_pattern = "|".join(separators)

# Split the text
chunks = re.split(f"({split_pattern})", markdown_text, flags=re.MULTILINE)

# Clean and filter chunks
chunks = [chunk.strip() for chunk in chunks if chunk]
chunks = [chunk.strip() for chunk in chunks if len(chunk.strip()) > 5]
seen = set()
# Keep only the unique chunks
chunks = [chunk for chunk in chunks if not (chunk in seen or seen.add(chunk))]
# Merge small chunks
merged_chunks = []
current_chunk = ""
for chunk in chunks:
if len(current_chunk) < self.min_chunk_size: # + len(chunk)
if len(current_chunk) == 0:
current_chunk = chunk
else:
current_chunk += "\n\n" + chunk
else:
if current_chunk:
merged_chunks.append(current_chunk.strip())
current_chunk = chunk

if current_chunk:
merged_chunks.append(current_chunk.strip())

return merged_chunks

def merge_chunks(self, chunks: List[str]) -> List[str]:
"""
Merge chunks using efficient clustering approach

Args:
chunks: List of text chunks

Returns:
List of merged chunks
"""
if len(chunks) <= 1:
return chunks

# Compute embeddings
embeddings = self.compute_embeddings(chunks)

# Compute pairwise cosine similarity
similarity_matrix = cosine_similarity(embeddings)

# Perform hierarchical clustering
clustering = AgglomerativeClustering(
n_clusters=None,
distance_threshold=1 - self.similarity_threshold,
metric="precomputed",
linkage="complete",
)

# Convert similarity to distance (1 - similarity)
distance_matrix = 1 - similarity_matrix

# Fit clustering
cluster_labels = clustering.fit_predict(distance_matrix)

# Group chunks by cluster
clustered_chunks = {}
for idx, label in enumerate(cluster_labels):
if label not in clustered_chunks:
clustered_chunks[label] = []
clustered_chunks[label].append(idx)

# Merge chunks within clusters
merged_chunks = []
for cluster in clustered_chunks.values():
# Sort cluster indices to maintain original order
cluster.sort()

# Merge chunks in the cluster
merged_chunk = ""
current_length = 0

for idx in cluster:
# If adding this chunk would exceed max size, start a new merged chunk
if current_length + len(chunks[idx]) > self.max_chunk_size:
if merged_chunk:
merged_chunks.append(merged_chunk)
merged_chunk = chunks[idx]
current_length = len(chunks[idx])
else:
# Add chunk to current merged chunk
merged_chunk += " " + chunks[idx] if merged_chunk else chunks[idx]
current_length += len(chunks[idx])

# Add the last merged chunk
if merged_chunk:
merged_chunks.append(merged_chunk)

return merged_chunks

def chunk_markdown(self, markdown_text: str) -> List[str]:
"""
Main method to chunk and merge markdown text

Args:
markdown_text: Input markdown text

Returns:
List of merged markdown chunks
"""
# First, split the markdown into initial chunks based on headings and line breaks
initial_chunks = self.split_markdown(markdown_text)

# Then merge chunks based on semantic similarity
final_chunks = self.merge_chunks(initial_chunks)

return final_chunks

The above code shows the MarkdownSemanticChunker class — a clever little class that transforms raw markdown into intelligently segmented chunks. This chunker does two key things: first, it breaks down the text using markdown’s structural elements like code blocks, headings, and horizontal rules. Then, it uses an embedding model to cluster and merge these chunks based on their semantic similarity.

The magic happens in three main methods:

  1. split_markdown(): This method is like a markdown detective, using regex patterns to carefully slice the text using important structural elements — code blocks, headings, horizontal rules, and other markdown features. It also ensures that no chunk is smaller than min_chunk_size by merging small fragments.
  2. compute_embeddings(): Here’s where we bring in the semantic smarts. Using a pre-trained sentence transformer (by default, the lightweight all-MiniLM-L6-v2 model), we convert each text chunk into a dense vector representation. This lets us understand the “meaning” behind the text, not just its words.
  3. merge_chunks(): This is where the real semantic magic happens. Using hierarchical clustering, the method groups chunks that are semantically similar while maintaining the original order of chunks. The most similar chunks (chunks with a higher similarity score than similarity_threshold are then merged together as long as merging them doesn’t exceed the max_chunk_size.

The result? A chunking strategy that understands both the structure of your markdown and the underlying semantics of the text. It’s perfect for creating high-quality, semantically coherent chunks that can supercharge our RAG (Retrieval-Augmented Generation) system.

With the changes we’ve made to expand our database and the improved chunking strategy, we can now update our add_data function that we defined in the first part to incorporate these changes.

def add_data(
db: LanceDBConnection,
table_name: str,
filepath: Path | str | None = None,
url: str | None = None,
min_chunk_size: int = 500,
max_chunk_size: int = 2000,
chunker_similarity_threshold: float = 0.6,
ems_model: Any = EMS_MODEL,
schema: Type[LanceModel] | None = None,
ems_model_device: str = DEVICE,
):
"""
Accepts an input file path or a URL, get a list of chunks/results and adds
them to a LanceDB table. A simplistic implementation so far.

Args:
db (lancedb.db.DBConnection): A DBConnection object to use to interact with LanceDB
table_name (str): The name of the table where the file's information will be updated
filepath (pathlib.Path | str, optional): The path to the file. Defaults to None.
url (str, optional): The URL to get the information from. Used only when filepath is None. Defaults to None.
min_chunk_size (int, optional): The minimum size of the chunks to split the text into. Defaults to 500
max_chunk_size (int, optional): The minimum size of the chunks to split the text into. Defaults to 2000
chunker_similarity_threshold (float, optional): The similarity thresold by which to merge the similar chunks used by the semantic chunker.
Must be 0 <= x <= 1. Defaults to 0.6.
ems_model (str | lancedb.embeddings, optional): The name of the embedding model or the embedding itself to use.
Only required when a new table has to be created.
Defaults to "Gemini".
schema (Type[LanceModel], optional): The schema to use when a new table is created.
When a new table has to be created, if not provided, a default schema is used.
Defaults to None.
ems_model_device (str): The device on which to run the embedding model. Defaults to "cuda".
"""
if filepath is not None:
if isinstance(filepath, str):
filepath = Path(filepath)
if not filepath.is_file():
logger.exception("filepath must be a valid path to an existing file")
return
# Accept data from a wider range of file types
if filepath.suffix.lower() in [".doc", ".docx"]:
text = docx_to_md(filepath)
elif filepath.suffix.lower() in [".pdf"]:
text = pdf_to_md(filepath)
elif filepath.suffix.lower() in [".jpg", ".jpeg", ".png"]:
text = extract_text_from_image(filepath)
elif filepath.suffix.lower() in [".txt", ".md"]:
with filepath.open("r") as tfile:
text = tfile.read()
else:
logger.exception(
"Input file must be of either of the following types: [.docx, .pdf, .jpg, .png, .txt]. No other file type supported!"
)
return
#...Or extract text from a url
elif url is not None and is_url(url):
text = urls_to_md(url)[url]
else:
logger.error(
"No valid filepath or URL provided. Provide at least one of the two to proceed."
)
return

# We now use the MarkdownSemanticChunker to chunk the text
chunker = MarkdownSemanticChunker(
min_chunk_size=min_chunk_size,
max_chunk_size=max_chunk_size,
similarity_threshold=chunker_similarity_threshold,
)

try:
chunks: list[str] = chunker.chunk_markdown(text)
except Exception as e:
logger.exception(f"An error occurred while chunking the text: {e}")
return

# Transform the text chunks into a list of dictionaries with the following keys:
# index: The i'th chunk of text from this file
# text: The text from the chunk
# name: The name of the file this chunk was extracted from
# metadata: Any chunk-specific or file-specific metadata like filepath, type, page, chunk start, title etc.
data = [
{
"index": i,
TEXT_FIELD_NAME: chunk,
"name": filepath.stem,
"metadata": {"filepath": str(filepath)},
}
for i, chunk in enumerate(chunks)
]

add_to_lance_table(
db,
table_name,
data,
schema=schema,
ems_model=ems_model,
ems_model_device=ems_model_device,
)

Use Validators And Chain-of-Thought Prompting

In our previous iteration, our RAG system involved using the user’s input query directly to perform hybrid (vector+full text) search and retrieve the relevant text chunks which were then promptly passed to the LLM client to use to answer the user’s original query. There were several problems with that which we pointed out back then as well. Here, we’re going to fix some of those problems mostly by utilizing the ‘structured output’ feature of most modern LLMs. The python library, Instructor for structured LLM outputs along with pydantic, makes this feature particularly easy to use.

First off, let’s work on the vector search. While using the original user query works for most normal cases, there are some scenarios where it inevitably fails. The most common scenario is one where we need multiple search results from different sources to answer the original query. While using the original query will work for some cases, it will fail for just as many cases. Additionally, there may be cases where we need an answer using a particular source identified by metadata. For such cases, we need to extract the metadata filters from the user query e.g. a user query may ask for data from a particular file in which case, we need to search for relevant chunks from that file only.

In our previous iteration of the RAG system, we directly used the user’s input query for hybrid search, retrieving relevant text chunks and feeding them to the LLM. However, this approach had some limitations. For instance, it struggled to handle scenarios requiring information from multiple sources or specific metadata-based filtering.

To address these shortcomings, we’ll leverage the “structured output” feature of modern LLMs, made easy to implement with the Instructor for structured LLM outputs library and pydantic.

Let’s start with vector search. While using the original query works in many cases, it can fall short when:

  • Multiple sources are needed: A single query might not be sufficient to retrieve information from different sources needed to answer the question.
  • Metadata-based filtering: Users might specify a particular source (e.g., a specific file) for their answer, requiring targeted retrieval.

To handle these situations, we need a mechanism to extract metadata filters from the user’s query. For example, if a user asks for data from a specific file, we need to search only within that file’s relevant chunks.

import re
import json
import sqlparse

from pydantic import BaseModel, Field, field_validator
from lancedb.rerankers import Reranker
from lancedb.db import DBConnection as LanceDBConnection


def validate_sql_filter(filter_expr: str, allowed_columns: list[str] = None) -> bool:
"""
Validate a SQL WHERE clause filter expression.

Args:
filter_expr (str): The SQL filter expression to validate
allowed_columns (list[str], optional): List of columns that are allowed in the filter

Returns:
bool: True if the filter expression is valid, False otherwise
"""
# Check for empty or None input
if not filter_expr or not isinstance(filter_expr, str):
return False

# Remove leading/trailing whitespace
filter_expr = filter_expr.strip()

# Check for SQL injection attempts
dangerous_patterns = [
r';', # Semicolon (potential statement separator)
r'--', # SQL comment
r'/\*', # Start of multi-line comment
r'\bDROP\b', # Drop statement
r'\bDELETE\b', # Delete statement
r'\bUPDATE\b', # Update statement
r'\bINSERT\b', # Insert statement
]

for pattern in dangerous_patterns:
if re.search(pattern, filter_expr, re.IGNORECASE):
return False

try:
# Use sqlparse to parse and validate the expression
parsed = sqlparse.parse(filter_expr)

# Ensure we have a parseable expression
if not parsed:
return False

# Convert to a standard format
normalized = sqlparse.format(filter_expr, reindent=True, keyword_case='upper')

# Check if it looks like a valid WHERE clause
if not re.match(r'^[\w\s=<>!\'"%&|()]+$', normalized):
return False

# If allowed columns are specified, validate column names
if allowed_columns:
# Extract column names from the filter
column_pattern = r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b'
found_columns = set(re.findall(column_pattern, filter_expr))

# Check if all found columns are in the allowed list
if not all(col in allowed_columns for col in found_columns):
return False

return True

except Exception:
return False

class VectorSearch(BaseModel):
"""
Perform vector search to get relevant content to answer user query
"""
sub_queries: list[str] = Field(..., description="The decomposed list of sub-questions derived from the original query optimized for hybrid search")
search_query: str = Field(description="The original search query.")
metadata_filters: str = Field(default="", description="The SQL expressions used as predicates for filtering operations used for hybrid search")

@field_validator("metadata_filters")
def is_valid_SQL_expression(cls, exp: str) -> str:
if exp is not None and len(exp) > 0:
allowed_columns = ['text', 'index', "name", 'filepath'] # Define your allowed columns
if not validate_sql_filter(exp, allowed_columns):
raise ValueError(f"Invalid metadata filter expression. Only {allowed_columns} can be used for filtering.")
return exp

def process(
self,
db: LanceDBConnection,
table_name: str,
reranker: Reranker,
filters: str | None = None,
max_search_results: int = MAX_SEARCH_RESULTS
) -> str:
try:
if filters is None or len(filters) == 0:
filters = self.metadata_filters
elif self.metadata_filters is not None and isinstance(self.metadata_filters, str):
if validate_sql_filter(filters):
# Validate the input filters before using them
filters = ' AND '.join([filters, self.metadata_filters])
else:
filters = self.metadata_filters
if len(filters) == 0:
filters = None
search_results = search_lancedb(
db,
table_name,
[self.search_query]+self.sub_queries,
filters,
reranker,
max_search_results
)
context = f"<retrieved vector search results>\n{json.dumps(search_results, indent=2)}\n</retrieved vector search results>"
return context
except Exception as e:
logger.exception(f"Vector Search failed with the following error: {e}")
return "<retrieved vector search results>\n(No Results)\n</retrieved vector search results>"

We define the VectorSearch model with three fields: sub_queries, search_query and metadata_filters. Additionally, we define a field_validator for metadata_filters field to make sure that it is a valid SQL expression. The process method is used to perform the vector search given the list of sub-queries and the metadata filters using the search_lancedb method that we defined in the last part. The process method returns the relevant context which will be passed to the LLM client to answer the query.

Now, let’s tackle the process of answering the user’s query. In our previous iteration, we relied on a simple _str_ response model, leaving the LLM’s output unchecked. This time, we’ll introduce the Output model, which will serve as a validator for the LLM’s responses.

By incorporating the Output model, we’ll implement the Chain-of-Thought (CoT) prompting strategy, encouraging the LLM to break down its reasoning into a logical and explainable sequence. We will also ask the model to provide its reasoning for its answer along with the answer. This will enhance the transparency and reliability of the RAG system.

from pydantic import model_validator

class Validation(BaseModel):
is_valid: bool = Field(..., description="Whether the value is valid based on the rules")
error_message: Optional[str] = Field(..., description="The error message if the value is not valid, to be used for re-asking the model")

def validate_chain_of_thought(values: dict) -> dict:
chain_of_thought = values["chain_of_thought"]
answer = values["answer"]
reasoning = values["reasoning"]
resp = gclient.chat.completions.create(
messages=[
{
"role": "system",
"content": "You are a validator. Determine if the value follows from the statement. If it is not, explain why.",
},
{
"role": "user",
"content": f"Verify that `{answer}` and its provided `{reasoning}` follows the chain of thought: {chain_of_thought}",
},
],
response_model=Validation,
)
if not resp.is_valid:
raise ValueError(resp.error_message)
return values

class Output(BaseModel):
'''
The answer to the user's input query along with the reasoning and source from where the answer was extracted
'''
answer: str = Field(description="The final answer to the user query. It should be respectful, polite, unbiased and should not promote or encourage harm or violence.")
reasoning: str = Field(description="The reasoning for the answer")
chain_of_thought: str = Field(description="Think step-by-step to come up with the correct answer")
sources: str = Field(default="", description="The comma-separated list of sources where the answer was found")

@model_validator(mode="before")
@classmethod
def chain_of_thought_makes_sense(cls, data: dict) -> Any:
return validate_chain_of_thought(data)

def parse(self):
return f"**Final Answer:** {self.answer}\nReasoning:\n{self.reasoning}\nSources: {self.sources}"

The Output model is designed to provide a comprehensive and transparent view of the LLM’s reasoning process. It includes four key fields: answer, reasoning, chain_of_thought, and source.

Not only do we provide the final answer, but we also expose the reasoning behind it, along with the source(s) from which the LLM derived its information. The chain_of_thought field serves a dual purpose:

  • It encourages the LLM to think through the problem step-by-step, breaking down complex reasoning into a more understandable sequence.
  • It provides a mechanism for verifying the correctness of the answer by validating the LLM’s chain of thought.

The validate_chain_of_thought function leverages another LLM as a validator, using the Validation pydantic model. To learn more about using an LLM as a validator, along with other forms of validation, you can check out this blog by Instructor.

With these additions in place, the following code snippet demonstrates how to answer a single query using our enhanced RAG system.

import json
import lancedb
import instructor
import google.generativeai as genai

system_message = """You are a system that answers questions based on the provided context.
Answer exactly what the questions asks using the context.
Your answer must always be grounded in truth, reasonable, polite and respectful.
""".strip()

db = lancedb.connect(uri)

client: instructor.Instructor = instructor.from_gemini(
client=genai.GenerativeModel(model_name="gemini-1.5-flash-latest")
)

reranker = lancedb.rerankers.AnswerdotaiRerankers("answerdotai/answerai-colbert-small-v1")

messages: list[dict] = []

while True:
query = input("('q' or 'exit' or 'quit' to quit) > ")
if query in ("q", "exit", "quit"):
break

vsearch: VectorSearch = client.create(
response_model=VectorSearch,
messages=[
{"role": "system", "content": "Given the input query, extract sub-queries optimized for vector search"},
{"role": "user", "content": query}
],
max_retries=2
)
context = vsearch.process(db, table_name, reranker, filters, max_search_results)
messages.append(
{
"role": "user",
"content": f"Given the following context: {context}\n\nAnswer the following question: {query}"
}
)
response: Output = client.create(
response_model=FinalOutput,
messages=[system_message]+messages,
max_retries=2
)
print(response.parse())
messages.append(response.parse())

You’ll notice a few subtle but important changes from our initial iteration. Here, we’ve streamlined the process by using the LLM client to break down the original query into smaller sub-queries before performing the search within the process method. And for answering the query, we’ve switched to using Output as the response model instead of simply a string.

While these changes might seem minor at first glance, they have a significant impact on the overall quality of our RAG system’s responses. These improvements lay the groundwork for further enhancements, including additional validation measures that ensure the LLM’s responses are as consistent and accurate as possible. This is just the beginning of our journey to build a truly robust and reliable RAG system.

Conclusion

We’ve taken a significant step forward in building a more sophisticated and robust RAG system in this installment. By expanding our database to encompass a wider range of data sources and implementing a smarter chunking strategy, we help improve the retrieval results which in turn leads to better responses to our queries. By using Chain-of-Thought (CoT) prompting, we help guide the LLM towards more accurate responses. We also introduced some validators to ensure the reliability and correctness of our RAG. Thus, we’ve laid the groundwork for a truly powerful and trustworthy tool and move closer to our goal of an enterprise grade RAG system.

That said, this journey is far from over. In the next part of this series, we’ll explore even more advanced techniques, including multi-hop retrieval and graph RAG, to further enhance the accuracy and depth of our RAG system. Stay tuned for more exciting developments as we continue to build a cutting-edge RAG system capable of handling complex tasks and providing insightful responses.

--

--

No responses yet