Mastering LangChain RAG: Implementing Streaming Capabilities (Part 3)

Eric Vaillancourt
23 min readJun 9, 2024

--

All code examples mentioned can be found on our GitHub repository.

Welcome to our in-depth series on LangChain’s RAG (Retrieval-Augmented Generation) technology. Over the course of six articles, we’ll explore how you can leverage RAG to enhance your applications with state-of-the-art natural language processing techniques. Whether you’re a developer, a data scientist, or just an AI enthusiast, this series will equip you with the knowledge to implement and optimize RAG in your projects.

Overview of the Series:

1. Quick Start Guide to LangChain RAG: Jump right in with our first tutorial where we’ll cover the basics of setting up LangChain RAG. This introductory article will help you get your environment ready and run your first RAG-based application.

2. Integrating Chat History: Learn how to incorporate chat history into your RAG model to maintain context and improve interaction quality in chat-like conversations. We will also learn how to save chat hostory to an SQL database.

3. Implementing Streaming Capabilities: (This artile) Discover how to implement streaming with RAG to handle real-time data processing efficiently, perfect for applications requiring immediate responses.

4. Returning Sources with Results: This tutorial will teach you how to configure RAG to provide sources along with responses, adding transparency and credibility to the generated outputs.

5. Adding Citations to Your Results: Enhance your application’s trustworthiness by automatically including citations in your results, making them verifiable and more reliable.

6. Putting It All Together: In our final article, we’ll integrate all the components learned in previous tutorials to build a comprehensive RAG application, demonstrating the power and versatility of this technology.

Streaming

This article is based on a notebook published by LangChain. However, I have added a working example on how to implement streaming with FastAPI.

Including the sources in the streaming

In the dynamic landscape of question-and-answer (Q&A) applications, transparency about information sources has become a fundamental pillar for establishing trust and credibility. This series of articles, which explores the implementation of streaming capabilities, addresses a critical feature in its third installment: the return of source documents.

Drawing from the groundwork laid in the “LLM Powered Autonomous Agents” blog post by Lilian Weng, and our previous discussions in the “Returning Sources Guide,” we will delve into practical approaches for integrating source transparency seamlessly into streaming Q&A applications. This installment not only enhances user trust but also enriches the interaction by providing a verifiable path back to the origins of the information provided.

Setup Environment: Ensure your development environment is prepared with the necessary dependencies.

pip install --upgrade --quiet  langchain langchain-community langchainhub langchain-openai langchain-chroma bs4 python-dotenv

We need to set environment variable OPENAI_API_KEY for the embeddings model, which can be done directly or loaded from a .env file like so:

from dotenv import load_dotenv
load_dotenv()

You will have to create a file called “.env”. Here is a sample:

OPENAI_API_KEY = "your-key-here"

Chain with sources

Here is the Q&A application that incorporates source tracking, developed based on the principles outlined in Lilian Weng’s blog post “LLM Powered Autonomous Agents” from the Returning Sources guide:

Importing Libraries and Modules

The snippet starts by importing necessary libraries and modules:

import bs4
from langchain import hub
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
  1. bs4: This is Beautiful Soup 4, a Python library for pulling data out of HTML and XML files. It can be used in this context to parse web pages retrieved as sources.
  2. langchain: A framework for building language understanding applications. The hub is a community-driven platform for discovering and sharing commonly used components like prompts, chains, and agents for building AI applications with LangChain.
  3. langchain_chroma: Chroma is part of LangChain that deals with chromatic data representation, potentially for visualizing or handling data more effectively within the Q&A context.
  4. WebBaseLoader: A component from langchain_community.document_loaders that loads documents from web-based sources. This is critical for retrieving real-time data from the internet to answer queries.
  5. StrOutputParser: From langchain_core.output_parsers, this module parses the output strings from the model's responses, making them suitable for display or further processing.
  6. RunnableParallel, RunnablePassthrough: These modules from langchain_core.runnables handle the execution of tasks. RunnableParallel can execute multiple tasks concurrently, while RunnablePassthrough is used for tasks that need to be executed sequentially without modification.
  7. ChatOpenAI: This is a module from langchain_openai designed to integrate OpenAI's capabilities (like ChatGPT) into the application for generating answers.
  8. OpenAIEmbeddings: Also from langchain_openai, used to generate embeddings from text which can help in retrieving and ranking relevant documents based on similarity.
  9. RecursiveCharacterTextSplitter: Part of langchain_text_splitters, it splits texts into manageable parts based on character count, useful for processing large documents or long user queries.
# Load, chunk and index the contents of the blog.
bs_strainer = bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs={"parse_only": bs_strainer},
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)


def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)


rag_chain_from_docs = (
RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
| prompt
| llm
| StrOutputParser()
)

rag_chain_with_source = RunnableParallel(
{"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)

This code snippet outlines the steps for loading, processing, and querying blog content using a range of specialized libraries to implement a Q&A feature with source tracking. The process involves web scraping, text processing, vector embedding for retrieval, and querying with an AI model. Here’s a detailed breakdown:

Initial Setup and Web Scraping:

# Load, chunk, and index the contents of the blog.
bs_strainer = bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs={"parse_only": bs_strainer},
)
docs = loader.load()
  • bs4.SoupStrainer: This object is used to filter out specific parts of the HTML content, focusing only on the blog post content, title, and header, which speeds up the parsing process.
  • WebBaseLoader: This component from the langchain_community.document_loaders module is used to load web documents. Here, it's configured to load specific elements from Lilian Weng’s blog post using Beautiful Soup’s filtering capabilities.

Text Splitting and Embedding:

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())
  • RecursiveCharacterTextSplitter: This splits the loaded documents into manageable chunks of 1000 characters each, with an overlap of 200 characters between chunks to ensure continuity in context.
  • Chroma.from_documents: This function creates a vector store from the document splits by converting text into embeddings using OpenAIEmbeddings. These embeddings allow for efficient retrieval of relevant text based on semantic similarity.

Retrieval and Query Processing:

retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
  • vectorstore.as_retriever: Converts the vectorstore into a retriever that can fetch relevant document snippets based on query similarity.
  • hub.pull: This function retrieves a specific resource from the hub, in this case, a prompt configuration for the retriever.
  • ChatOpenAI: Initializes a ChatGPT model (GPT-3.5 Turbo) to generate responses, with temperature set to 0 for deterministic outputs.

Integration and Response Generation:

def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

rag_chain_from_docs = (
RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
| prompt
| llm
| StrOutputParser()
)

rag_chain_with_source = RunnableParallel(
{"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)
  • format_docs: A helper function to format the retrieved documents into a string.
  • RunnablePassthrough: Used here to pass the formatted string to the next stages.
  • RunnableParallel and RunnablePassthrough: These modules manage the flow of data through the system, enabling parallel processing of context retrieval and question handling.
  • rag_chain_from_docs and rag_chain_with_source: These constructs define the flow of data and execution for retrieving documents and generating responses using the AI model. They enable the integration of context retrieval and response generation, ensuring that the sources are tracked and used effectively in generating the answer.

This code essentially builds a robust Q&A system that leverages AI and advanced text handling to deliver precise answers derived from a specific knowledge base, while maintaining transparency about the source of the information.

Streaming final outputs

Streaming final outputs is straightforward with LCEL:

for chunk in rag_chain_with_source.stream("What is Task Decomposition"):
print(chunk)

Result:

{'question': 'What is Task Decomposition'}
{'context': [Document(page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\nTask decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='The AI assistant can parse user input to several tasks: [{"task": task, "id", task_id, "dep": dependency_task_ids, "args": {"text": text, "image": URL, "audio": URL, "video": URL}}]. The "dep" field denotes the id of the previous task which generates a new resource that the current task relies on. A special tag "-task_id" refers to the generated text image, audio and video in the dependency task with id as task_id. The task MUST be selected from the following options: {{ Available Task List }}. There is a logical relationship between tasks, please note their order. If the user input can\'t be parsed, you need to reply empty JSON. Here are several cases for your reference: {{ Demonstrations }}. The chat history is recorded as {{ Chat History }}. From this chat history, you can find the path of the user-mentioned resources for your task planning.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='Fig. 11. Illustration of how HuggingGPT works. (Image source: Shen et al. 2023)\nThe system comprises of 4 stages:\n(1) Task planning: LLM works as the brain and parses the user requests into multiple tasks. There are four attributes associated with each task: task type, ID, dependencies, and arguments. They use few-shot examples to guide LLM to do task parsing and planning.\nInstruction:', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'})]}
{'answer': ''}
{'answer': 'Task'}
{'answer': ' decomposition'}
{'answer': ' is'}
{'answer': ' a'}
{'answer': ' technique'}
{'answer': ' used'}
{'answer': ' to'}
{'answer': ' break'}
{'answer': ' down'}
{'answer': ' complex'}
{'answer': ' tasks'}
{'answer': ' into'}
{'answer': ' smaller'}
{'answer': ' and'}
{'answer': ' simpler'}
{'answer': ' steps'}
{'answer': '.'}
{'answer': ' It'}
{'answer': ' can'}
{'answer': ' be'}
{'answer': ' done'}
{'answer': ' through'}
{'answer': ' methods'}
{'answer': ' like'}
{'answer': ' Chain'}
{'answer': ' of'}
{'answer': ' Thought'}
{'answer': ' ('}
{'answer': 'Co'}
{'answer': 'T'}
{'answer': ')'}
{'answer': ' or'}
{'answer': ' Tree'}
{'answer': ' of'}
{'answer': ' Thoughts'}
{'answer': ','}
{'answer': ' which'}
{'answer': ' involve'}
{'answer': ' dividing'}
{'answer': ' the'}
{'answer': ' task'}
{'answer': ' into'}
{'answer': ' manageable'}
{'answer': ' sub'}
{'answer': 'tasks'}
{'answer': ' and'}
{'answer': ' exploring'}
{'answer': ' multiple'}
{'answer': ' reasoning'}
{'answer': ' possibilities'}
{'answer': ' at'}
{'answer': ' each'}
{'answer': ' step'}
{'answer': '.'}
{'answer': ' Task'}
{'answer': ' decomposition'}
{'answer': ' can'}
{'answer': ' be'}
{'answer': ' performed'}
{'answer': ' by'}
{'answer': ' using'}
{'answer': ' simple'}
{'answer': ' prompts'}
{'answer': ','}
{'answer': ' task'}
{'answer': '-specific'}
{'answer': ' instructions'}
{'answer': ','}
{'answer': ' or'}
{'answer': ' human'}
{'answer': ' inputs'}
{'answer': '.'}
{'answer': ''}

We can add some logic to compile our stream as it’s being returned:

output = {}
curr_key = None
for chunk in rag_chain_with_source.stream("What is Task Decomposition"):
for key in chunk:
if key not in output:
output[key] = chunk[key]
else:
output[key] += chunk[key]
if key != curr_key:
print(f"\n\n{key}: {chunk[key]}", end="", flush=True)
else:
print(chunk[key], end="", flush=True)
curr_key = key
output

Result:

question: What is Task Decomposition

context: [Document(page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\nTask decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='The AI assistant can parse user input to several tasks: [{"task": task, "id", task_id, "dep": dependency_task_ids, "args": {"text": text, "image": URL, "audio": URL, "video": URL}}]. The "dep" field denotes the id of the previous task which generates a new resource that the current task relies on. A special tag "-task_id" refers to the generated text image, audio and video in the dependency task with id as task_id. The task MUST be selected from the following options: {{ Available Task List }}. There is a logical relationship between tasks, please note their order. If the user input can\'t be parsed, you need to reply empty JSON. Here are several cases for your reference: {{ Demonstrations }}. The chat history is recorded as {{ Chat History }}. From this chat history, you can find the path of the user-mentioned resources for your task planning.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='Fig. 11. Illustration of how HuggingGPT works. (Image source: Shen et al. 2023)\nThe system comprises of 4 stages:\n(1) Task planning: LLM works as the brain and parses the user requests into multiple tasks. There are four attributes associated with each task: task type, ID, dependencies, and arguments. They use few-shot examples to guide LLM to do task parsing and planning.\nInstruction:', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'})]

answer: Task decomposition is a technique used to break down complex tasks into smaller and simpler steps. It involves transforming big tasks into multiple manageable tasks to enhance model performance. This process can be done using prompting techniques, task-specific instructions, or human inputs.
{'question': 'What is Task Decomposition',
'context': [Document(page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}),
Document(page_content='Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\nTask decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}),
Document(page_content='The AI assistant can parse user input to several tasks: [{"task": task, "id", task_id, "dep": dependency_task_ids, "args": {"text": text, "image": URL, "audio": URL, "video": URL}}]. The "dep" field denotes the id of the previous task which generates a new resource that the current task relies on. A special tag "-task_id" refers to the generated text image, audio and video in the dependency task with id as task_id. The task MUST be selected from the following options: {{ Available Task List }}. There is a logical relationship between tasks, please note their order. If the user input can\'t be parsed, you need to reply empty JSON. Here are several cases for your reference: {{ Demonstrations }}. The chat history is recorded as {{ Chat History }}. From this chat history, you can find the path of the user-mentioned resources for your task planning.', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}),
Document(page_content='Fig. 11. Illustration of how HuggingGPT works. (Image source: Shen et al. 2023)\nThe system comprises of 4 stages:\n(1) Task planning: LLM works as the brain and parses the user requests into multiple tasks. There are four attributes associated with each task: task type, ID, dependencies, and arguments. They use few-shot examples to guide LLM to do task parsing and planning.\nInstruction:', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'})],
'answer': 'Task decomposition is a technique used to break down complex tasks into smaller and simpler steps. It involves transforming big tasks into multiple manageable tasks to enhance model performance. This process can be done using prompting techniques, task-specific instructions, or human inputs.'}

Streaming intermediate steps

Let’s consider a scenario where we aim to stream not just the final results of the process, but also certain intermediary stages. Take, for instance, our Chat history chain. In this case, we first reformulate the user’s question before it is submitted to the retriever. Originally, this reformulated question wasn’t included in the final output. However, we’ve now adjusted our chain to ensure that this new question is also returned.

from operator import itemgetter

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tracers.log_stream import LogEntry, LogStreamCallbackHandler

contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
).with_config(tags=["contextualize_q_system_prompt"])

contextualize_q_chain = (contextualize_q_prompt | llm | StrOutputParser()).with_config(
tags=["contextualize_q_chain"]
)

qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", qa_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
)


def contextualized_question(input: dict):
if input.get("chat_history"):
return contextualize_q_chain
else:
return input["question"]


rag_chain = (
RunnablePassthrough.assign(context=contextualized_question | retriever | format_docs)
| qa_prompt
| llm
)

Code Overview

Imports and Utilities

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
  • ChatPromptTemplate, MessagesPlaceholder: These are from langchain_core.prompts and are used to create structured prompt templates for the language model.

Defining the Chat Contextualization System:

contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
).with_config(tags=["contextualize_q_system_prompt"])
  • Prompt Template Creation: This creates a prompt template for reformulating questions. The prompt instructs the model to generate a standalone question that can be understood without prior chat history.
  • .with_config(tags=["contextualize_q_system_prompt"]): This method configures the prompt template by tagging it, which helps in tracking or identifying the configuration within larger systems or logs.

Prompt Chain for Contextualization:

contextualize_q_chain = (contextualize_q_prompt | llm | StrOutputParser()).with_config(
tags=["contextualize_q_chain"]
)
  • Pipeline Creation: Chains together the prompt template (contextualize_q_prompt), a language model (llm), and a string output parser (StrOutputParser), creating a data flow pipeline.
  • Tag Configuration: Similar to the above, .with_config(tags=["contextualize_q_chain"]) tags the entire chain, useful for monitoring or logging purposes.

Define Function for Contextualized Question Handling:

def contextualized_question(input: dict):
if input.get("chat_history"):
return contextualize_q_chain
else:
return input["question"]

Function Logic: Determines whether to pass the question through the reformulation chain or return it as is, based on the presence of chat history.

Full Q&A System with Contextual Retrieval:

qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", qa_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
)

rag_chain = (
RunnablePassthrough.assign(context=contextualized_question | retriever | format_docs)
| qa_prompt
| llm
)
  • Q&A Prompt Template: Creates a prompt for answering questions, instructing the assistant to use provided context to formulate concise answers.
  • RunnablePassthrough: This part of the chain handles the processing of context, retrieves necessary documents, and formats them before feeding into the QA prompt.

This chain demonstrates a use of LangChain for integrating chat history into a dynamic Q&A process, ensuring questions are contextualized and answers are informed by relevant data.

Streaming the reformulated question (intermediate step)

from langchain_core.messages import HumanMessage

chat_history = []

question = "What is Task Decomposition?"
ai_msg = rag_chain.invoke({"question": question, "chat_history": chat_history})
chat_history.extend([HumanMessage(content=question), ai_msg])

second_question = "What are common ways of doing it?"

async for chunk in rag_chain.astream_events(
{"question": second_question, "chat_history": chat_history},
include_tags=["contextualize_q_system_prompt"],
include_names=["StrOutputParser"],
include_types=["on_parser_end"],
version="v1",
):
print(chunk)

This code provides a structured interaction with a Q&A system, utilizing a message chain (rag_chain) to handle queries and incorporate the history of the conversation.

The code also demonstrates advanced features like event streaming with (.astream_events) and specific filters applied for debugging or detailed analysis purposes with (.include_tags, .include_names, .include_types). Let’s break down the components and their functionalities.

Code Overview

from langchain_core.messages import HumanMessage

chat_history = []
  • HumanMessage: This class from langchain_core.messages is used to create message objects that represent inputs from a human user.
  • chat_history: An empty list initialized to keep track of the conversation's history.
question = "What is Task Decomposition?"
ai_msg = rag_chain.invoke({"question": question, "chat_history": chat_history})
chat_history.extend([HumanMessage(content=question), ai_msg])
  • question: Stores the user's query as a string.
  • rag_chain.invoke: This method calls the rag_chain, passing the initial question and the current chat_history. It returns the AI's response.
  • chat_history.extend: Adds both the user's question and the AI's response to the chat history. This is important for maintaining context in ongoing interactions.
second_question = "What are common ways of doing it?"
  • second_question: A new question introduced after the initial interaction.
async for chunk in rag_chain.astream_events(
{"question": second_question, "chat_history": chat_history},
include_tags=["contextualize_q_system_prompt"],
include_names=["StrOutputParser"],
include_types=["on_parser_end"],
version="v1",
):
print(chunk)
  • rag_chain.astream_events: This asynchronous generator function streams events generated during the processing of the second question. It's particularly useful for debugging or monitoring the system's behavior in real-time.

Filtering Parameters:

  • include_tags: Filters events to those tagged with "contextualize_q_system_prompt". This tag is likely associated with the part of the chain that handles the reformulation of the question, ensuring that only events related to this specific task are streamed.
  • include_names: Filters events to include those from components named "StrOutputParser", which suggests focusing on the points in the processing where string parsing completes.
  • include_types: Filters for events triggered at the end of parsing ("on_parser_end"), indicating the finalization of data parsing steps.
  • version: Specifies the version of the API or functionality being used, set to "v1". Very important because LangChain will probably change the output in future versions.

The primary aim of this code is to interact with an AI-based Q&A system in a context-aware manner, utilizing the conversation history for better response accuracy. Additionally, the streaming and filtering of events provide insights into specific operational stages of the system, particularly focusing on how questions are reformulated and handled internally. This can be especially valuable for developers and engineers for debugging or optimizing the system’s performance.

If we wanted to get our retrieved docs, we could filter on name “Retriever”:

async for chunk in rag_chain.astream_events(
{"question": second_question, "chat_history": chat_history},
include_names=["Retriever"],
version="v1",):
print(chunk)

Using Streaming with FastAPI: A Working HTML Example

Streaming data to the front end can enhance the user experience by providing real-time updates without the need for constant polling. In this section, we’ll explore how to implement streaming in a FastAPI application and display the streamed data using a simple HTML frontend.

Setting Up FastAPI for Streaming

First, let’s set up a basic FastAPI application that streams data to the frontend. We’ll create an endpoint that streams events using Server-Sent Events (SSE) using the RAG pipeline that we created in this notebook.

import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from langchain_core.messages import AIMessageChunk
#from logging import logging

from dotenv import load_dotenv
import bs4
from langchain import hub
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

load_dotenv()

# Load, chunk and index the contents of the blog.
bs_strainer = bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs={"parse_only": bs_strainer},
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever().with_config(
tags=["retriever"]
)
# We need to add streaming=True
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
contextualize_q_prompt = ChatPromptTemplate.from_messages(
[
("system", contextualize_q_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
)
contextualize_q_chain = (contextualize_q_prompt | llm | StrOutputParser()).with_config(
tags=["contextualize_q_chain"]
)

qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
[
("system", qa_system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{question}"),
]
)

def contextualized_question(input: dict):
if input.get("chat_history"):
return contextualize_q_chain
else:
return input["question"]

def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
RunnablePassthrough.assign(context=contextualize_q_chain | retriever | format_docs)
| qa_prompt
| llm
).with_config(
tags=["main_chain"]
)

app = FastAPI()

# Allow CORS for all origins (for testing purposes; restrict in production)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

@app.get("/")
async def root():
return FileResponse("static/index.html")

def serialize_aimessagechunk(chunk):
"""
Custom serializer for AIMessageChunk objects.
Convert the AIMessageChunk object to a serializable format.
"""
if isinstance(chunk, AIMessageChunk):
return chunk.content
else:
raise TypeError(
f"Object of type {type(chunk).__name__} is not correctly formatted for serialization"
)

async def generate_chat_events(message):
try:
async for event in rag_chain.astream_events(message, version="v1"):
# Only get the answer
sources_tags = ['seq:step:3', 'main_chain']
if all(value in event["tags"] for value in sources_tags) and event["event"] == "on_chat_model_stream":
chunk_content = serialize_aimessagechunk(event["data"]["chunk"])
if len(chunk_content) != 0:
data_dict = {"data": chunk_content}
data_json = json.dumps(data_dict)
yield f"data: {data_json}\n\n"

# Get the reformulated question
sources_tags = ['seq:step:2', 'main_chain', 'contextualize_q_chain']
if all(value in event["tags"] for value in sources_tags) and event["event"] == "on_chat_model_stream":
chunk_content = serialize_aimessagechunk(event["data"]["chunk"])
if len(chunk_content) != 0:
data_dict = {"reformulated": chunk_content}
data_json = json.dumps(data_dict)
yield f"data: {data_json}\n\n"

# Get the context
sources_tags = ['main_chain', 'retriever']
if all(value in event["tags"] for value in sources_tags) and event["event"] == "on_retriever_end":
documents = event['data']['output']['documents']
# Create a new list to contain the formatted documents
formatted_documents = []
# Iterate over each document in the original list
for doc in documents:

# Create a new dictionary for each document with the required format
formatted_doc = {
'page_content': doc.page_content,
'metadata': {
'source': doc.metadata['source'],
},
'type': 'Document'
}
# Add the formatted document to the final list
formatted_documents.append(formatted_doc)

# Create the final dictionary with the key "context"
final_output = {'context': formatted_documents}

# Convert the dictionary to a JSON string
data_json = json.dumps(final_output)
yield f"data: {data_json}\n\n"
if event["event"] == "on_chat_model_end":
print("Chat model has completed one response.")

except Exception as e:
print('error'+ str(e))

@app.get("/chat_stream/{message}")
async def chat_stream_events(message: str):
return StreamingResponse(generate_chat_events({"question": message, "chat_history": []}), media_type="text/event-stream")

if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)

This code sets up a FastAPI application that streams data to the client using Server-Sent Events (SSE). The generate_chat_events function handles the streaming logic, sending chunks of data as they become available.

Creating the HTML Frontend

Next, we’ll create an HTML frontend that connects to the FastAPI streaming endpoint and displays the streamed data in real-time.

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Chat Stream</title>
<!-- Bootstrap CSS -->
<link href="https://maxcdn.bootstrapcdn.com/bootstrap/4.5.2/css/bootstrap.min.css" rel="stylesheet">
<style>
#output {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
height: 200px;
overflow-y: scroll;
white-space: pre-wrap; /* To ensure line breaks are respected */
}
#context {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
height: 200px;
overflow-y: scroll;
white-space: pre-wrap; /* To ensure line breaks are respected */
}
#reformulated {
border: 1px solid #ddd;
padding: 10px;
margin-top: 10px;
height: 100px;
overflow-y: scroll;
white-space: pre-wrap; /* To ensure line breaks are respected */
}
</style>
</head>
<body>
<div class="container mt-5">
<h1 class="text-center">Ask a Question</h1>
<form id="questionForm" class="mb-4">
<div class="form-group">
<label for="question">Question:</label>
<input type="text" class="form-control" id="question" name="question" required>
</div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>

Reformulated Question:
<div id="reformulated" class="border rounded p-3"></div>

Answer:
<div id="output" class="border rounded p-3"></div>

Context:
<div id="context" class="border rounded p-3"></div>
</div>

<!-- Bootstrap JS and dependencies -->
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/@popperjs/core@2.5.4/dist/umd/popper.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.5.2/js/bootstrap.min.js"></script>

<script>
document.getElementById('questionForm').addEventListener('submit', function(e) {
e.preventDefault();
const question = document.getElementById('question').value;
const outputDiv = document.getElementById('output');
const contextDiv = document.getElementById('context');
const reformulatedDiv = document.getElementById('reformulated');
outputDiv.innerHTML = ''; // Clear previous output
contextDiv.innerHTML = ''; // Clear previous context
reformulatedDiv.innerHTML = ''; // Clear previous reformulated question

const url = `/chat_stream/${encodeURIComponent(question)}`;

const eventSource = new EventSource(url);

eventSource.onmessage = function(event) {
try {
if (event.data.trim()) { // Check if the data is not empty
const data = JSON.parse(event.data);
if (data.context) {
// Display context data directly
const contextData = data.context.map(item => item.page_content).join('<br><br>');
contextDiv.innerHTML = contextData;
contextDiv.scrollTop = contextDiv.scrollHeight; // Scroll to bottom
}
if (data.reformulated) {
// Append reformulated question to reformulatedDiv
reformulatedDiv.innerHTML += data.reformulated;
reformulatedDiv.scrollTop = reformulatedDiv.scrollHeight; // Scroll to bottom
}
if (data.data) {
// Append regular data to outputDiv
outputDiv.innerHTML += data.data;
outputDiv.scrollTop = outputDiv.scrollHeight; // Scroll to bottom
}
}
} catch (error) {
outputDiv.innerHTML += "\nAn error occurred: " + error.message;
}
};

eventSource.onerror = function() {
if (eventSource.readyState === EventSource.CLOSED) {
// Do nothing
} else if (eventSource.readyState === EventSource.CONNECTING) {
// Do nothing
} else {
outputDiv.innerHTML += "\nAn error occurred while streaming.";
}
eventSource.close();
};
});
</script>
</body>
</html>

This HTML page includes a form where users can submit a question. The form submission triggers a connection to the FastAPI SSE endpoint, and the streamed data (reformulated question, answer, and context) is displayed in real-time.

Detailed Explanation of the Streaming Logic

The following section of code processes different event types and streams the appropriate data to the client in real-time. Let’s break down each part:

Only Get the Answer

# Only get the answer
sources_tags = ['seq:step:3', 'main_chain']
if all(value in event["tags"] for value in sources_tags) and event["event"] == "on_chat_model_stream":
chunk_content = serialize_aimessagechunk(event["data"]["chunk"])
if len(chunk_content) != 0:
data_dict = {"data": chunk_content}
data_json = json.dumps(data_dict)
yield f"data: {data_json}\n\n"

Identify the Relevant Event:

  • sources_tags = ['seq:step:3', 'main_chain']: Tags that identify events related to getting the final answer.
  • The condition checks if all the tags in sources_tags are present and if the event type is on_chat_model_stream.

Process and Stream the Event:

  • chunk_content = serialize_aimessagechunk(event["data"]["chunk"]): Serializes the chunk of data.
  • If the chunk content is not empty, it is added to a dictionary, converted to a JSON string, and yielded as an SSE data chunk.

Get the Reformulated Question

# Get the reformulated question
sources_tags = ['seq:step:2', 'main_chain', 'contextualize_q_chain']
if all(value in event["tags"] for value in sources_tags) and event["event"] == "on_chat_model_stream":
chunk_content = serialize_aimessagechunk(event["data"]["chunk"])
if len(chunk_content) != 0:
data_dict = {"reformulated": chunk_content}
data_json = json.dumps(data_dict)
yield f"data: {data_json}\n\n"

Identify the Relevant Event:

  • sources_tags = ['seq:step:2', 'main_chain', 'contextualize_q_chain']: Tags that identify events related to the reformulated question.
  • The condition checks if all the tags in sources_tags are present and if the event type is on_chat_model_stream.

Process and Stream the Event:

  • chunk_content = serialize_aimessagechunk(event["data"]["chunk"]): Serializes the chunk of data.
  • If the chunk content is not empty, it is added to a dictionary, converted to a JSON string, and yielded as an SSE data chunk.

Get the Context

# Get the context
sources_tags = ['main_chain', 'retriever']
if all(value in event["tags"] for value in sources_tags) and event["event"] == "on_retriever_end":
documents = event['data']['output']['documents']
# Create a new list to contain the formatted documents
formatted_documents = []
# Iterate over each document in the original list
for doc in documents:
# Create a new dictionary for each document with the required format
formatted_doc = {
'page_content': doc.page_content,
'metadata': {
'source': doc.metadata['source'],
},
'type': 'Document'
}
# Add the formatted document to the final list
formatted_documents.append(formatted_doc)

# Create the final dictionary with the key "context"
final_output = {'context': formatted_documents}

# Convert the dictionary to a JSON string
data_json = json.dumps(final_output)
yield f"data: {data_json}\n\n"

Identify the Relevant Event:

  • sources_tags = ['main_chain', 'retriever']: Tags that identify events related to retrieving context.
  • The condition checks if all the tags in sources_tags are present and if the event type is on_retriever_end.

Process and Stream the Event:

  • Extracts and formats the documents into a list of dictionaries.
  • Creates the final dictionary with the key "context", converts it to a JSON string, and yields it as an SSE data chunk.

This breakdown explains how different event types are processed and streamed to the client using Server-Sent Events (SSE), ensuring each section handles specific data types (answer, reformulated question, context) correctly. The tags used to identify events are added to the respective chains using .with_config(tags=[...]).

Conclusion

In conclusion, the code provided demonstrates a sophisticated approach to managing and interacting with a context-aware question-and-answer system. The use of event streaming with specific filters for debugging or analysis highlights the system’s flexibility and depth, allowing developers to monitor and refine the process dynamically. Moreover, the inclusion of detailed filtering options shows a commitment to providing a robust toolset for developers to dissect and understand the workings of their applications at a granular level. This capability is invaluable for optimizing performance and ensuring that the system behaves as expected under various scenarios.

Additionally, the integration of a FastAPI backend with an HTML frontend showcases how real-time data streaming can be implemented to enhance user experience. The HTML example provided demonstrates a practical approach to setting up a frontend that connects to a FastAPI streaming endpoint. Users can see the results of their queries in real-time, with dynamically updating content, such as the reformulated question, the retrieved context, and the final answer. This not only makes the interaction more engaging but also showcases the power of modern web technologies in creating interactive and responsive applications.

For those interested in exploring the code further or integrating similar functionalities into their own projects, the complete code will be available on GitHub.

Thank you for reading me!

Support my work by buying me a coffee or two!

--

--

Eric Vaillancourt

Eric Vaillancourt, an AI enthusiast, began his career in 1989, founded a tech consultancy in 1996, and has led over 1,500 trainings in IT and AI.