Private llama3 langgraph 100% local on windows

Include: retrieval, answer, hallucination grader + websearch

bedy kharisma
Data And Beyond

--

Photo by Jessica Knowlden on Unsplash

I see RAG as the cornerstone of future AI applications. We’re not seeking AI that merely generates nonsensical responses. What we aim for is an AI capable of retrieving answers from specific document sets, understanding the context of the query, directing itself to search its embeddings or resorting to web search if necessary, assessing the validity of its own response to prevent hallucinations, and ultimately delivering human-like answers grounded in the documents we’ve supplied.

Say no more, the wait is over. Lets break it down.

this article is inspired by this video:

Several adjustments have been made to incorporate the source data differently. Instead of relying on a single PDF file, the system now utilizes a PDF directory as one of the sources. Additionally, the focus has shifted towards routing any type of question to the vector store, rather than resorting to a web search.

In this detailed breakdown, we’ll dissect each line of the provided code snippet to unravel the inner workings of Langchain:

# Install modules
!pip install ollama langchain beautifulsoup4 chromadb gradio unstructured langchain-nomic langchain_community tiktoken langchainhub langgraph tavily-python gpt4all -q
!pip install "unstructured[all-docs]" -q
!ollama pull llama3
!ollama pull nomic-embed-textp

These lines initiate the installation of essential modules and libraries required for Langchain and its associated functionalities. The pip install commands ensure that all necessary dependencies are installed, while ollama pull fetches specific models and resources required for text processing.

# Importing libraries
import os
import bs4
import getpass
import ollama
from typing import List
from typing_extensions import TypedDict
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
WebBaseLoader,
UnstructuredPDFLoader,
OnlinePDFLoader,
UnstructuredFileLoader,
PyPDFDirectoryLoader,
)
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OllamaEmbeddings, GPT4AllEmbeddings
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_community.tools.tavily_search import TavilySearchResults

Here, various libraries and modules essential for Langchain operations are imported. These include modules for text splitting, document loading, vector embedding, output parsing, and more. Each import statement brings in functionality crucial for different aspects of NLP tasks.

#Options
local_llm = 'llama3'
llm = ChatOllama(model=local_llm, format="json", temperature=0)
#embeddings
#embeddings = OllamaEmbeddings(model="nomic-embed-text")
embeddings = GPT4AllEmbeddings()

These lines set options and configurations for Langchain. local_llm specifies the local model to be used, while llm initializes a ChatOllama instance for interacting with the model. The choice of embeddings, whether Ollama nomic embed text, or GPT-4, is also specified here.

##sources
#url
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
#pdf
loader = PyPDFDirectoryLoader("C://Users//ASUS//Downloads//sources//")
data = loader.load()
docs_list.extend(data)

These lines fetch textual data from different sources, including web URLs and PDF documents. WebBaseLoader is used to load content from URLs, while PyPDFDirectoryLoader is employed for loading PDF files from a local directory.

#splitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=200
)
doc_splits = text_splitter.split_documents(docs_list)

Here, the text splitter is initialized to segment documents into smaller chunks for efficient processing. This step is essential for tasks like vectorization and retrieval, where handling large documents may pose performance challenges.

# Add to vectorDB
vectorstore = Chroma.from_documents(
documents=doc_splits,
collection_name="rag-chroma",
embedding=embeddings,
)
retriever = vectorstore.as_retriever()

These lines create a vector store using Chroma, a component of Langchain responsible for storing and querying document embeddings. The documents are vectorized using the specified embeddings and added to the vector store, enabling efficient retrieval based on semantic similarity.

### Retrieval Grader 
prompt = PromptTemplate(
template="""system You are a grader assessing relevance
of a retrieved document to a user question. If the document contains keywords related to the user question,
grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
Provide the binary score as a JSON with a single key 'score' and no preamble or explaination.
user
Here is the retrieved document: \n\n {document} \n\n
Here is the user question: {question} \n assistant
""",
input_variables=["question", "document"],
)
retrieval_grader = prompt | llm | JsonOutputParser()

This section defines a prompt template for grading the relevance of retrieved documents to user questions. The template outlines the grading criteria and prompts the user to provide a binary score indicating document relevance. The score is processed using Langchain’s ChatOllama instance and parsed into JSON format for further evaluation.

### Generate
prompt = PromptTemplate(
template="""system You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise user
Question: {question}
Context: {context}
Answer: assistant""",
input_variables=["question", "document"],
)

This prompt template is designed for generating answers to user questions based on retrieved context. It instructs the assistant to provide a concise answer using up to three sentences, utilizing the retrieved documents as context. This template facilitates question-answering tasks within the Langchain framework.

# Post-processing
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

This function performs post-processing on retrieved documents, formatting them into a readable text format. It concatenates the page content of each document with double line breaks for improved readability.

# Chain
rag_chain = prompt | llm | StrOutputParser()

Here, a processing chain (rag_chain) is constructed using Langchain components, including the prompt template, ChatOllama instance (llm), and string output parser. This chain facilitates the generation of responses to user queries based on the provided context.

### Hallucination Grader 
prompt = PromptTemplate(
template=""" system You are a grader assessing whether
an answer is grounded in / supported by a set of facts. Give a binary score 'yes' or 'no' score to indicate
whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a
single key 'score' and no preamble or explanation. user
Here are the facts:
\n ------- \n
{documents}
\n ------- \n
Here is the answer: {generation} assistant""",
input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()

This section defines a prompt template for grading the factual grounding of generated answers. The template prompts the user to evaluate whether the answer is supported by a set of facts provided as context. The generated answer (generation) and relevant documents (documents) are passed through Langchain components for evaluation, and the resulting score is parsed into JSON format.

### Answer Grader 
prompt = PromptTemplate(
template="""system You are a grader assessing whether an
answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is
useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
user Here is the answer:
\n ------- \n
{generation}
\n ------- \n
Here is the question: {question} assistant""",
input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()

This section defines a prompt template for grading the usefulness of generated answers in resolving user questions. The template prompts the user to provide a binary score indicating whether the answer effectively addresses the given question. The answer (generation) and question (question) are processed using Langchain components, and the resulting score is parsed into JSON format for evaluation.

### Router
prompt = PromptTemplate(
template="""system
You excel in directing user inquiries either to a vector store or a web search.
For queries related to documents within the vector store, prioritize utilizing the vector store.
There's no need to strictly match keywords in the question to topics within the vector store.
If the question isn't covered by the vector store's content, resort to a web search.
Provide a binary decision, 'web_search' or 'vectorstore', depending on the nature of the question.
Return the a JSON with a single key 'datasource' and
no premable or explaination. Question to route: {question} assistant""",
input_variables=["question"],
)

question_router = prompt | llm | JsonOutputParser()

This section defines a prompt template for routing user inquiries to either a vector store or a web search engine based on the nature of the question. The template instructs the user to provide a binary decision indicating the preferred data source (web_search or vectorstore) for answering the question. The question is processed using Langchain components, and the routing decision is parsed into JSON format.

### Search
os.environ["TAVILY_API_KEY"] = "tvly-XXXX"
web_search_tool = TavilySearchResults(k=3)

This section initializes a web search tool (web_search_tool) powered by the Tavily API. The Tavily API key is set as an environment variable to enable access to the web search functionality.

### State
class GraphState(TypedDict):
"""
Represents the state of our graph.

Attributes:
question: question
generation: LLM generation
web_search: whether to add search
documents: list of documents
"""
question : str
generation : str
web_search : str
documents : List[str]

Here, a GraphState class is defined to represent the state of the Langchain graph. The state includes attributes such as the question, generated answer, indication of whether a web search is required, and a list of relevant documents.

### Nodes

def retrieve(state):
"""
Retrieve documents from vectorstore

Args:
state (dict): The current graph state

Returns:
state (dict): New key added to state, documents, that contains retrieved documents
"""
print("---RETRIEVE---")
question = state["question"]

# Retrieval
documents = retriever.invoke(question)
return {"documents": documents, "question": question}

def generate(state):
"""
Generate answer using RAG on retrieved documents

Args:
state (dict): The current graph state

Returns:
state (dict): New key added to state, generation, that contains LLM generation
"""
print("---GENERATE---")
question = state["question"]
documents = state["documents"]

# RAG generation
generation = rag_chain.invoke({"context": documents, "question": question})
return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state):
"""
Determines whether the retrieved documents are relevant to the question
If any document is not relevant, we will set a flag to run web search

Args:
state (dict): The current graph state

Returns:
state (dict): Filtered out irrelevant documents and updated web_search state
"""

print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]

# Score each doc
filtered_docs = []
web_search = "No"
for d in documents:
score = retrieval_grader.invoke({"question": question, "document": d.page_content})
grade = score['score']
# Document relevant
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
# Document not relevant
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
# We do not include the document in filtered_docs
# We set a flag to indicate that we want to run web search
web_search = "Yes"
continue
return {"documents": filtered_docs, "question": question, "web_search": web_search}

These functions represent different nodes in the Langchain graph, each responsible for a specific task. retrieve retrieves relevant documents from the vector store, generate generates an answer using the RAG model, and grade_documents evaluates the relevance of retrieved documents to the user question and determines whether a web search is required.

def web_search(state):
"""
Web search based based on the question

Args:
state (dict): The current graph state

Returns:
state (dict): Appended web results to documents
"""

print("---WEB SEARCH---")
question = state["question"]
documents = state["documents"]

# Web search
docs = web_search_tool.invoke({"query": question})
web_results = "\n".join([d["content"] for d in docs])
web_results = Document(page_content=web_results)
if documents is not None:
documents.append(web_results)
else:
documents = [web_results]
return {"documents": documents, "question": question}

This function performs a web search based on the user question and appends the retrieved web results to the existing list of documents. The Tavily API is utilized to execute the web search, and the retrieved content is formatted and added to the document list.

def route_question(state):
"""
Route question to web search or RAG.

Args:
state (dict): The current graph state

Returns:
str: Next node to call
"""

print("---ROUTE QUESTION---")
question = state["question"]
print(question)
source = question_router.invoke({"question": question})
print(source)
print(source['datasource'])
if source['datasource'] == 'web_search':
print("---ROUTE QUESTION TO WEB SEARCH---")
return "websearch"
elif source['datasource'] == 'vectorstore':
print("---ROUTE QUESTION TO RAG---")
return "vectorstore"

This function determines the routing of user questions based on the nature of the query. It utilizes the question_router to assess whether the question should be routed to a web search or processed using the RAG model. The decision is based on the output of the question_router component.

def decide_to_generate(state):
"""
Determines whether to generate an answer, or add web search

Args:
state (dict): The current graph state

Returns:
str: Binary decision for next node to call
"""

print("---ASSESS GRADED DOCUMENTS---")
question = state["question"]
web_search = state["web_search"]
filtered_documents = state["documents"]

if web_search == "Yes":
# All documents have been filtered check_relevance
# We will re-generate a new query
print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
return "websearch"
else:
# We have relevant documents, so generate answer
print("---DECISION: GENERATE---")
return "generate"

This function decides whether to generate an answer using the RAG model or proceed with a web search based on the relevance of retrieved documents. If all documents are deemed irrelevant, the function chooses to initiate a web search. Otherwise, it proceeds with answer generation.

def grade_generation_v_documents_and_question(state):
"""
Determines whether the generation is grounded in the document and answers question.

Args:
state (dict): The current graph state

Returns:
str: Decision for next node to call
"""

print("---CHECK HALLUCINATIONS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]

score = hallucination_grader.invoke({"documents": documents, "generation": generation})
grade = score['score']

# Check hallucination
if grade == "yes":
print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
# Check question-answering
print("---GRADE GENERATION vs QUESTION---")
score = answer_grader.invoke({"question": question,"generation": generation})
grade = score['score']
if grade == "yes":
print("---DECISION: GENERATION ADDRESSES QUESTION---")
return "useful"
else:
print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
return "not useful"
else:
pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
return "not supported"

This function evaluates the factual grounding and relevance of the generated answer to the user question. It utilizes the hallucination_grader to assess whether the answer is grounded in the provided documents and the answer_grader to determine if it effectively addresses the question. Based on the evaluation, the function decides whether the answer is useful or not.

from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae

# Build graph
workflow.set_conditional_entry_point(
route_question,
{
"websearch": "websearch",
"vectorstore": "retrieve",
},
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"websearch": "websearch",
"generate": "generate",
},
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": END,
"not useful": "websearch",
},
)

In this section, a Langchain graph (workflow) is constructed to orchestrate the sequence of operations. Nodes representing different tasks, such as document retrieval, grading, answer generation, and web search, are added to the graph. Conditional edges are defined to handle routing decisions and guide the flow of execution based on the current state.

try:
# Compile
app = workflow.compile()

# Test
from pprint import pprint
inputs = {"question": "Who is bedy kharisma?"}
for output in app.stream(inputs):
for key, value in output.items():
pprint(f"Finished running: {key}:")
pprint(value["generation"])

except Exception as e:
# Handle the error
print("An error occurred:", e)

Finally, the Langchain graph is compiled into a functional application (app), which is then tested using sample inputs. The graph processes the inputs through its defined nodes and edges, executing the specified tasks and producing output. Any errors encountered during execution are handled gracefully, ensuring robustness and reliability.

This comprehensive breakdown provides a detailed insight into the intricate workings of the Langchain framework, showcasing its versatility and power in tackling complex natural language processing tasks. By leveraging the capabilities of Langchain, developers can unlock new possibilities and revolutionize the way we interact with textual data.

Full python notebook file can be downloaded here:

in simpler future use, just update the PDF directory here:


##sources
#url
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
#pdf
loader = PyPDFDirectoryLoader("C://Users//ASUS//Downloads//sources//")
data = loader.load()
docs_list.extend(data)

and modify the question here:

try:
# Compile
app = workflow.compile()

# Test
from pprint import pprint
inputs = {"question": "Who is bedy kharisma?"}
for output in app.stream(inputs):
for key, value in output.items():
pprint(f"Finished running: {key}:")
pprint(value["generation"])

except Exception as e:
# Handle the error
print("An error occurred:", e)

--

--

bedy kharisma
Data And Beyond

Indonesian Strategist,Analyst, Researcher, Achievement Addict who helps company grow their business by applying data-driven Management. Follow to follow