Implementing A Flavor of Corrective RAG using Langchain, Chromadb , Zephyr-7B-Beta and OpenAI

Plaban Nayak
The AI Forum
Published in
26 min readMar 3, 2024
Corrective RAG workflow

What happens in Native RAG ?

Preproduction Setup:

  1. The Knowledge Source is loaded and formatted using Langchain Orchestration Framework.
  2. The Formatted Documents is then split into semantically relevant chunks.
  3. Convert the chunks into vector embeddings using an Embedding Model.
  4. Load the Vector Embeddings into the VectorStore.

Production Setup:

  1. Set up the LLM for Response Synthesis.
  2. User asks a query.
  3. The query is converted into vector embeddings uisng and Embedding Model.
  4. This embedding is then matched against the Knowledge Source embeddings stored in the VectorStore.
  5. Find top-k similar documents matching the user query.
  6. Postprocess and aggregate matched documents retrieved into a context .
  7. Generate a prompt based on the user query and context and pass it to the LLM.
  8. The LLM then synthesizes the response based on the prompt /instruction provided.

What are the benefits of RAG?

There are several important advantages to the RAG approach:

  • RAG makes sure that an LLM’s response isn’t based just on old, stagnant training data. Instead, the model gets its answers from current external data sources.
  • RAG aims to reduce the possibility of reacting with false or misleading information (sometimes referred to as hallucinations) by basing the LLM model’s output on pertinent, outside knowledge. Citations to the original sources can be included in the outputs, enabling human verification.
  • By utilizing RAG, the LLM can deliver contextually appropriate answers that are customized to an organization’s proprietary or domain-specific data.

In contrast to alternative methods of integrating domain-specific data into LLM customization, RAG is simple and cost-effective. Organizations can deploy RAG without needing to customize the model. This is especially beneficial when models need to be updated frequently with new data.

What is Corrective RAG ?

Corrective RAG is a comprehensive framework that combines retrieval evaluation, corrective actions, web searches, and generative model integration to enhance the accuracy, reliability, and robustness of text generation models by ensuring the utilization of accurate and relevant knowledge,

In simple terms, Corrective RAG is the method used to grade documents based on their relevance to the data source. If the data source is related to the question, the process proceeds to generation. Otherwise, the framework seeks additional data sources and utilizes web search to supplement retrieval. But in this example instead of seeking additional data sources we will simply discarded the non-relevant data sources.

Technology Stack

Code Implementation

Install required dependencies

pip install -q transformers 
pip install -q accelerate
pip install -q bitsandbytes
pip install -q langchain
pip install -q sentence-transformers
pip install -q chromadb openpyxl
pip install -q ragatouille
pip install -q langchain_openai
pip install -q datasets

Import Required Dependencies

from tqdm.notebook import tqdm
import pandas as pd
from typing import Optional, List, Tuple
from datasets import Dataset
import matplotlib.pyplot as plt

pd.set_option("display.max_colwidth", None) # this will be helpful when visualizing retriever outputs

Load the Knowledge Source

import datasets

ds = datasets.load_dataset("m-ric/huggingface_doc", split="train")

Format the datasets into Langchain Document Schema

from langchain.docstore.document import Document as LangchainDocument

RAW_KNOWLEDGE_BASE = [
LangchainDocument(page_content=doc["text"], metadata={"source": doc["source"]}) for doc in tqdm(ds)
]
print(len(RAW_KNOWLEDGE_BASE))
print(RAW_KNOWLEDGE_BASE[1].page_content)
print(RAW_KNOWLEDGE_BASE[1].metadata)

Split the documents into chunks

from langchain.text_splitter import RecursiveCharacterTextSplitter

# We use a hierarchical list of separators specifically tailored for splitting Markdown documents
# This list is taken from LangChain's MarkdownTextSplitter class.
MARKDOWN_SEPARATORS = [
"\n#{1,6} ",
"```\n",
"\n\\*\\*\\*+\n",
"\n---+\n",
"\n___+\n",
"\n\n",
"\n",
" ",
"",
]

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # the maximum number of characters in a chunk: we selected this value arbitrarily
chunk_overlap=100, # the number of characters to overlap between chunks
add_start_index=True, # If `True`, includes chunk's start index in metadata
strip_whitespace=True, # If `True`, strips whitespace from the start and end of every document
separators=MARKDOWN_SEPARATORS,
)

docs_processed = []
for doc in RAW_KNOWLEDGE_BASE:
docs_processed += text_splitter.split_documents([doc])

Setup Embedding Model

We also have to keep in mind that when embedding documents, we will use an embedding model that has accepts a certain maximum sequence length max_seq_length. So we should make sure that our chunk sizes are below this limit, because any longer chunk will be truncated before processing, thus losing relevancy.

from sentence_transformers import SentenceTransformer

# To get the value of the max sequence_length, we will query the underlying `SentenceTransformer` object used in the RecursiveCharacterTextSplitter.
print(f"Model's maximum sequence length: {SentenceTransformer('thenlper/gte-small').max_seq_length}")

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("thenlper/gte-small")
lengths = [len(tokenizer.encode(doc.page_content)) for doc in tqdm(docs_processed)]

# Plot the distrubution of document lengths, counted as the number of tokens
fig = pd.Series(lengths).hist()
plt.title("Distribution of document lengths in the knowledge base (in count of tokens)")
plt.show()

As you can see, the chunk lengths are not aligned with our limit of 512 tokens, and some documents are above the limit, thus some part of them will be lost in truncation!

So we should change the RecursiveCharacterTextSplitter class to count length in number of tokens instead of number of characters. Then we can choose a specific chunk size, here we would choose a lower threshold than 512: smaller documents could allow the split to focus more on specific ideas. But too small chunks would split sentences in half, thus losing meaning again: the proper tuning is a matter of balance.

from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer

EMBEDDING_MODEL_NAME = "thenlper/gte-small"
chunk_size = 512
#
text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
AutoTokenizer.from_pretrained(EMBEDDING_MODEL_NAME),
chunk_size=chunk_size,
chunk_overlap=int(chunk_size / 10),
add_start_index=True,
strip_whitespace=True,
separators=MARKDOWN_SEPARATORS,
)
#
docs_processed = []
for doc in RAW_KNOWLEDGE_BASE:
docs_processed += text_splitter.split_documents([doc])

print(len(docs_processed)#19983

visualize the chunk sizes we would have in tokens from a common model

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL_NAME)
lengths = [len(tokenizer.encode(doc.page_content)) for doc in tqdm(docs_processed)]
fig = pd.Series(lengths).hist()
plt.title("Distribution of document lengths in the knowledge base (in count of tokens)")
plt.show()

Setup VectorStore

from google.colab import drive
drive.mount('/content/drive')
#
from langchain.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores.utils import DistanceStrategy

embedding_model = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL_NAME,
multi_process=True,
model_kwargs={"device": "cuda"},
encode_kwargs={"normalize_embeddings": True}, # set True for cosine similarity
)

KNOWLEDGE_VECTOR_DATABASE = Chroma.from_documents(docs_processed,
embedding_model,
persist_directory="/content/drive/MyDrive/CRAG",
collection_name="crag")
#
KNOWLEDGE_VECTOR_DATABASE.persist()
print(len(KNOWLEDGE_VECTOR_DATABASE.get()['documents']))

According to the documentation https://docs.trychroma.com/usage-guide embeddings are excluded by default for performance:

When using get or query you can use the include parameter to specify which data you want returned — any of embeddings, documents, metadatas, and for query, distances. By default, Chroma will return the documents, metadatas and in the case of query, the distances of the results. embeddings are excluded by default for performance and the ids are always returned.

You can include the embeddings when using get as followed:

print(collection.get(include=['embeddings', 'documents', 'metadatas']))

Check if the embeddings are retrieved based on user query

user_query = "How to create a pipeline object?"
print(f"\nStarting retrieval for {user_query=}...")
retrieved_docs = KNOWLEDGE_VECTOR_DATABASE.similarity_search(query=user_query, k=5)
print("\n==================================Top document==================================")
print(retrieved_docs[1].page_content)
print("==================================Metadata==================================")
print(retrieved_docs[1].metadata)

Setup the LLM

from transformers import pipeline
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from langchain.prompts import PromptTemplate
from langchain.llms import HuggingFacePipeline

READER_MODEL_NAME = "HuggingFaceH4/zephyr-7b-beta"

bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
)
model = AutoModelForCausalLM.from_pretrained(READER_MODEL_NAME, quantization_config=bnb_config)
tokenizer = AutoTokenizer.from_pretrained(READER_MODEL_NAME)

READER_LLM = pipeline(
model=model,
tokenizer=tokenizer,
task="text-generation",
do_sample=True,
temperature=0.2,
repetition_penalty=1.1,
return_full_text=False,
max_new_tokens=500,
)
#

llm = HuggingFacePipeline(pipeline=READER_LLM)

Create a Prompt

prompt_in_chat_format = [
{
"role": "system",
"content": """Using the information contained in the context,
give a comprehensive answer to the question.
Respond only to the question asked, response should be concise and relevant to the question.
Provide the number of the source document when relevant.
If the answer cannot be deduced from the context, do not give an answer.""",
},
{
"role": "user",
"content": """Context:
{context}
---
Now here is the question you need to answer.

Question: {question}""",
},
]
RAG_PROMPT_TEMPLATE = tokenizer.apply_chat_template(
prompt_in_chat_format, tokenize=False, add_generation_prompt=True
)
print(RAG_PROMPT_TEMPLATE)
<|system|>
Using the information contained in the context,
give a comprehensive answer to the question.
Respond only to the question asked, response should be concise and relevant to the question.
Provide the number of the source document when relevant.
If the answer cannot be deduced from the context, do not give an answer.</s>
<|user|>
Context:
{context}
---
Now here is the question you need to answer.

Question: {question}</s>
<|assistant|>
retrieved_docs_text = [doc.page_content for doc in retrieved_docs]  # we only need the text of the documents
context = "\nExtracted documents:\n"
context += "".join([f"Document {str(i)}:::\n" + doc for i, doc in enumerate(retrieved_docs_text)])

final_prompt = RAG_PROMPT_TEMPLATE.format(question="How to create a pipeline object?", context=context)

print(final_prompt)
<|system|>
Using the information contained in the context,
give a comprehensive answer to the question.
Respond only to the question asked, response should be concise and relevant to the question.
Provide the number of the source document when relevant.
If the answer cannot be deduced from the context, do not give an answer.</s>
<|user|>
Context:

Extracted documents:
Document 0:::
```

## Available Pipelines:Document 1:::
```
</tf>
</frameworkcontent>

## Pipeline

<Youtube id="tiZFewofSLM"/>

The [`pipeline`] is the easiest and fastest way to use a pretrained model for inference. You can use the [`pipeline`] out-of-the-box for many tasks across different modalities, some of which are shown in the table below:

<Tip>

For a complete list of available tasks, check out the [pipeline API reference](./main_classes/pipelines).

</Tip>Document 2:::
!--Copyright 2020 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the

⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.

-->

# How to create a custom pipeline?

In this guide, we will see how to create a custom pipeline and share it on the [Hub](hf.co/models) or add it to the
🤗 Transformers library.

First and foremost, you need to decide the raw entries the pipeline will be able to take. It can be strings, raw bytes,
dictionaries or whatever seems to be the most likely desired input. Try to keep these inputs as pure Python as possible
as it makes compatibility easier (even through other languages via JSON). Those will be the `inputs` of the
pipeline (`preprocess`).

Then define the `outputs`. Same policy as the `inputs`. The simpler, the better. Those will be the outputs of
`postprocess` method.

Start by inheriting the base class `Pipeline` with the 4 methods needed to implement `preprocess`,
`_forward`, `postprocess`, and `_sanitize_parameters`.


```python
from transformers import PipelineDocument 3:::
- **Self-contained**: A pipeline shall be as self-contained as possible. More specifically, this means that all functionality should be either directly defined in the pipeline file itself, should be inherited from (and only from) the [`DiffusionPipeline` class](https://github.com/huggingface/diffusers/blob/5cbed8e0d157f65d3ddc2420dfd09f2df630e978/src/diffusers/pipeline_utils.py#L56) or be directly attached to the model and scheduler components of the pipeline.
- **Easy-to-use**: Pipelines should be extremely easy to use - one should be able to load the pipeline and
use it for its designated task, *e.g.* text-to-image generation, in just a couple of lines of code. Most
logic including pre-processing, an unrolled diffusion loop, and post-processing should all happen inside the `__call__` method.
- **Easy-to-tweak**: Certain pipelines will not be able to handle all use cases and tasks that you might like them to. If you want to use a certain pipeline for a specific use case that is not yet supported, you might have to copy the pipeline file and tweak the code to your needs. We try to make the pipeline code as readable as possible so that each part –from pre-processing to diffusing to post-processing– can easily be adapted. If you would like the community to benefit from your customized pipeline, we would love to see a contribution to our [community-examples](https://github.com/huggingface/diffusers/tree/main/examples/community). If you feel that an important pipeline should be part of the official pipelines but isn't, a contribution to the [official pipelines](https://github.com/huggingface/diffusers/blob/main/src/diffusers/pipelines) would be even better.Document 4:::
```

## Pipeline

You can also push an entire pipeline with all it's components to the Hub. For example, initialize the components of a [`StableDiffusionPipeline`] with the parameters you want:

```py
from diffusers import (
UNet2DConditionModel,
AutoencoderKL,
DDIMScheduler,
StableDiffusionPipeline,
)
from transformers import CLIPTextModel, CLIPTextConfig, CLIPTokenizer

unet = UNet2DConditionModel(
block_out_channels=(32, 64),
layers_per_block=2,
sample_size=32,
in_channels=4,
out_channels=4,
down_block_types=("DownBlock2D", "CrossAttnDownBlock2D"),
up_block_types=("CrossAttnUpBlock2D", "UpBlock2D"),
cross_attention_dim=32,
)

scheduler = DDIMScheduler(
beta_start=0.00085,
beta_end=0.012,
beta_schedule="scaled_linear",
clip_sample=False,
set_alpha_to_one=False,
)

vae = AutoencoderKL(
block_out_channels=[32, 64],
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D", "DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D", "UpDecoderBlock2D"],
latent_channels=4,
)

text_encoder_config = CLIPTextConfig(
bos_token_id=0,
eos_token_id=2,
hidden_size=32,
intermediate_size=37,
layer_norm_eps=1e-05,
num_attention_heads=4,
num_hidden_layers=5,
pad_token_id=1,
vocab_size=1000,
)
text_encoder = CLIPTextModel(text_encoder_config)
tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip")
---
Now here is the question you need to answer.

Question: How to create a pipeline object?</s>
<|assistant|>

Synthesize Response

from langchain_core.output_parsers import JsonOutputParser
answer = llm(final_prompt)
print(answer)
/usr/local/lib/python3.10/dist-packages/langchain_core/_api/deprecation.py:117: LangChainDeprecationWarning: The function `__call__` was deprecated in LangChain 0.1.7 and will be removed in 0.2.0. Use invoke instead.
warn_deprecated(
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
To create a pipeline object, follow these steps:

1. Inherit the `Pipeline` base class from the `transformers` module.

2. Define the `inputs` and `outputs` of the pipeline. These can be strings, dictionaries, or any other Python data type that can be easily processed.

3. Implement the four required methods: `preprocess`, `_forward`, `postprocess`, and `_sanitize_parameters`.

- `preprocess`: This method takes the raw inputs and returns a dictionary of preprocessed inputs that can be passed to the model.

- `_forward`: This method takes the preprocessed inputs and the model and returns the output of the model.

- `postprocess`: This method takes the output of the model and returns the final output of the pipeline.

- `_sanitize_parameters`: This method ensures that the input parameters are valid and converts them into a format that can be used by the model.

4. Optionally, you can push the entire pipeline with all its components to the Hugging Face Hub or add it to the `transformers` library.

Here's an example implementation:

```python
from transformers import Pipeline

class MyPipeline(Pipeline):
def __init__(self, model, tokenizer):
super().__init__(model, tokenizer)

def preprocess(self, inputs):
# Preprocess inputs here
return {"input_ids": inputs}

def _forward(self, inputs):
# Pass preprocessed inputs to the model and return the output
return self.model(**inputs)[0]

def postprocess(self, outputs):
# Postprocess the output here
return outputs[0]

def _sanitize_parameters(self, hparams):
# Sanitize input parameters here
return hparams
```

Note that this implementation assumes a simple text classification pipeline using the `transformers` library. Adjust the implementation according to your specific use case.

Reranking

A good option for RAG is to retrieve more documents than you want in the end, then rerank the results with a more powerful retrieval model before keeping only the top_k.

from ragatouille import RAGPretrainedModel

RERANKER = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
#
print("=> Reranking documents...")
question = "How to create a pipeline object?"
relevant_docs = KNOWLEDGE_VECTOR_DATABASE.similarity_search(query=question, k=5)
relevant_docs = [doc.page_content for doc in relevant_docs]
reranked_relevant_docs = RERANKER.rerank(question, relevant_docs, k=3)
#
reranked_docs = [doc["content"] for doc in reranked_relevant_docs]
#Compare the documents retrived for normal vector search and rereanker
for i,doc in enumerate(relevant_docs[:3]):
print(f"Document {i}: {doc}")
print("="*80)
Document 0: ```

## Available Pipelines:
================================================================================
Document 1: ```
</tf>
</frameworkcontent>

## Pipeline

<Youtube id="tiZFewofSLM"/>

The [`pipeline`] is the easiest and fastest way to use a pretrained model for inference. You can use the [`pipeline`] out-of-the-box for many tasks across different modalities, some of which are shown in the table below:

<Tip>

For a complete list of available tasks, check out the [pipeline API reference](./main_classes/pipelines).

</Tip>
================================================================================
Document 2: !--Copyright 2020 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the

⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.

-->

# How to create a custom pipeline?

In this guide, we will see how to create a custom pipeline and share it on the [Hub](hf.co/models) or add it to the
🤗 Transformers library.

First and foremost, you need to decide the raw entries the pipeline will be able to take. It can be strings, raw bytes,
dictionaries or whatever seems to be the most likely desired input. Try to keep these inputs as pure Python as possible
as it makes compatibility easier (even through other languages via JSON). Those will be the `inputs` of the
pipeline (`preprocess`).

Then define the `outputs`. Same policy as the `inputs`. The simpler, the better. Those will be the outputs of
`postprocess` method.

Start by inheriting the base class `Pipeline` with the 4 methods needed to implement `preprocess`,
`_forward`, `postprocess`, and `_sanitize_parameters`.


```python
from transformers import Pipeline
================================================================================
for i,doc in enumerate(reranked_docs):
print(f"Document {i}: {doc}")
print("="*80)
Document 0: !--Copyright 2020 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the

⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.

-->

# How to create a custom pipeline?

In this guide, we will see how to create a custom pipeline and share it on the [Hub](hf.co/models) or add it to the
🤗 Transformers library.

First and foremost, you need to decide the raw entries the pipeline will be able to take. It can be strings, raw bytes,
dictionaries or whatever seems to be the most likely desired input. Try to keep these inputs as pure Python as possible
as it makes compatibility easier (even through other languages via JSON). Those will be the `inputs` of the
pipeline (`preprocess`).

Then define the `outputs`. Same policy as the `inputs`. The simpler, the better. Those will be the outputs of
`postprocess` method.

Start by inheriting the base class `Pipeline` with the 4 methods needed to implement `preprocess`,
`_forward`, `postprocess`, and `_sanitize_parameters`.


```python
from transformers import Pipeline
================================================================================
Document 1: ```
</tf>
</frameworkcontent>

## Pipeline

<Youtube id="tiZFewofSLM"/>

The [`pipeline`] is the easiest and fastest way to use a pretrained model for inference. You can use the [`pipeline`] out-of-the-box for many tasks across different modalities, some of which are shown in the table below:

<Tip>

For a complete list of available tasks, check out the [pipeline API reference](./main_classes/pipelines).

</Tip>
================================================================================
Document 2: - **Self-contained**: A pipeline shall be as self-contained as possible. More specifically, this means that all functionality should be either directly defined in the pipeline file itself, should be inherited from (and only from) the [`DiffusionPipeline` class](https://github.com/huggingface/diffusers/blob/5cbed8e0d157f65d3ddc2420dfd09f2df630e978/src/diffusers/pipeline_utils.py#L56) or be directly attached to the model and scheduler components of the pipeline.
- **Easy-to-use**: Pipelines should be extremely easy to use - one should be able to load the pipeline and
use it for its designated task, *e.g.* text-to-image generation, in just a couple of lines of code. Most
logic including pre-processing, an unrolled diffusion loop, and post-processing should all happen inside the `__call__` method.
- **Easy-to-tweak**: Certain pipelines will not be able to handle all use cases and tasks that you might like them to. If you want to use a certain pipeline for a specific use case that is not yet supported, you might have to copy the pipeline file and tweak the code to your needs. We try to make the pipeline code as readable as possible so that each part –from pre-processing to diffusing to post-processing– can easily be adapted. If you would like the community to benefit from your customized pipeline, we would love to see a contribution to our [community-examples](https://github.com/huggingface/diffusers/tree/main/examples/community). If you feel that an important pipeline should be part of the official pipelines but isn't, a contribution to the [official pipelines](https://github.com/huggingface/diffusers/blob/main/src/diffusers/pipelines) would be even better.
================================================================================

Create a prompt with reranked docs

retrieved_docs_text = [doc for doc in reranked_docs]  # we only need the text of the documents
context = "\nExtracted documents:\n"
context += "".join([f"Document {str(i)}:::\n" + doc for i, doc in enumerate(retrieved_docs_text)])

final_prompt = RAG_PROMPT_TEMPLATE.format(question="How to create a pipeline object?", context=context)

print(final_prompt)

Response Synthesis using Reranked Context

answer = llm(final_prompt)
print(answer)
Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
To create a pipeline object, follow these steps:

1. Define the inputs and outputs of your pipeline, as described in the context.

2. Inherit the `Pipeline` class from the `transformers` module, as shown in the context:

```python
from transformers import PipelineDocument 1:::
```

3. Implement the four required methods: `preprocess`, `_forward`, `postprocess`, and `_sanitize_parameters`. Here's an example implementation:

```python
class MyCustomPipeline(Pipeline):
def __init__(self,...):
super().__init__(... )

def preprocess(self, inputs):
# Preprocess the inputs here
return preprocessed_inputs

def _forward(self, inputs):
# Run the forward pass of your model here
return output

def postprocess(self, outputs):
# Postprocess the outputs here
return postprocessed_outputs

def _sanitize_parameters(self, parameters):
# Sanitize the parameters here
return sanitized_parameters
```

4. Load your pipeline using the `from_pretrained()` function provided by the `transformers` module:

```python
my_custom_pipeline = MyCustomPipeline.from_pretrained('my_custom_pipeline')
```

5. Use your pipeline object to perform inference on new data:

```python
results = my_custom_pipeline(input_data)
```

That's it! Your custom pipeline is now ready to use. Remember to follow the guidelines mentioned in the context to ensure that your pipeline is self-contained, easy-to-use, and easy-to-tweak. Additionally, consider contributing your pipeline to the Hugging Face Hub or the official pipelines if it could be useful to others.
  • The re-ranked context yields a much better response

Applying Corrective RAG

from langchain_openai import OpenAI
from google.colab import userdata
import os
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

llm_openai = OpenAI(temperature=0)
#
c_prompt = PromptTemplate(
template="""You are a grader assessing relevance of a retrieved document to a user question. \n
Here is the retrieved document: \n\n {context} \n\n
Here is the user question: {question} \n
If the document contains keywords related to the user question, grade it as relevant. \n
It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
Provide only the binary score as a text varible with a single key 'score' and no premable or explaination.""",
input_variables=["question", "context"],
)
#

score_prompt = """You are a grader assessing relevance of a retrieved document to a user question. \n
Here is the retrieved document: \n\n {context} \n\n
Here is the user question: {question} \n
If the document contains keywords related to the user question, grade it as relevant. \n
It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n"""
#
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from langchain.prompts import PromptTemplate

response_schemas = [
ResponseSchema(name="Score", description="score for the context query relevancy"),
]
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)
#
format_instructions = output_parser.get_format_instructions()
#
print(output_parser)
#
print(format_instructions)
response_schemas=[ResponseSchema(name='Score', description='score for the context query relevancy', type='string')]
The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":

```json
{
"Score": string // score for the context query relevancy
}
```
template=score_prompt+"\n{format_instructions}"
print(template)
#
scoreprompt = PromptTemplate.from_template(template=template)
print(f"scoreprompt : {scoreprompt}")
You are a grader assessing relevance of a retrieved document to a user question. \n \n        Here is the retrieved document: \n\n {context} \n\n\n        Here is the user question: {question} \n\n        If the document contains keywords related to the user question, grade it as relevant. \n\n        It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n\n        Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n\n{format_instructions}

scoreprompt: PromptTemplate(input_variables=['context', 'format_instructions', 'question'], template="You are a grader assessing relevance of a retrieved document to a user question. \n \n Here is the retrieved document: \n\n {context} \n\n\n Here is the user question: {question} \n\n If the document contains keywords related to the user question, grade it as relevant. \n\n It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n\n Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n\n{format_instructions}")

Prepared the final prompt to apply Corrective RAG

question = "How to create a pipeline object?"
context = reranked_docs[0]
final_prompt = scoreprompt.format_prompt(format_instructions=format_instructions,
question=question,
context=context,
).text
print(final_prompt)
You are a grader assessing relevance of a retrieved document to a user question. 

Here is the retrieved document:

!--Copyright 2020 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the

⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.

-->

# How to create a custom pipeline?

In this guide, we will see how to create a custom pipeline and share it on the [Hub](hf.co/models) or add it to the
🤗 Transformers library.

First and foremost, you need to decide the raw entries the pipeline will be able to take. It can be strings, raw bytes,
dictionaries or whatever seems to be the most likely desired input. Try to keep these inputs as pure Python as possible
as it makes compatibility easier (even through other languages via JSON). Those will be the `inputs` of the
pipeline (`preprocess`).

Then define the `outputs`. Same policy as the `inputs`. The simpler, the better. Those will be the outputs of
`postprocess` method.

Start by inheriting the base class `Pipeline` with the 4 methods needed to implement `preprocess`,
`_forward`, `postprocess`, and `_sanitize_parameters`.


```python
from transformers import Pipeline


Here is the user question: How to create a pipeline object?

If the document contains keywords related to the user question, grade it as relevant.

It does not need to be a stringent test. The goal is to filter out erroneous retrievals.

Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.

The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":

```json
{
"Score": string // score for the context query relevancy
}
```
score = llm_openai(final_prompt)

#####Output


```json
{
"Score": "yes"
}
```

Assembling relevant context by running the Corrective RAG logic.

Corrective RAG

Let’s implement self-reflective RAG with some ideas from the CRAG (Corrective RAG) paper:

  1. Grade documents for relevance relative to the question.
  2. If any are irrelevant, then we will not use the context for generation
  3. We will then pass final retrieved documents to an LLM for final answer generation.
 # Score
filtered_docs = []
grade_ = []
matched_relevant_docs = []
question = "How to create a pipeline object?"

search = "No" # Default do not opt for web search to supplement retrieval
for d in reranked_docs:
final_prompt = scoreprompt.format_prompt(format_instructions=format_instructions,
question=question,
context=d,
).text
print(final_prompt)
score = llm_openai(final_prompt)
print(score)
score_dict = eval(score.split("```json\n")[-1].replace("\n```","").replace("\t","").replace("\n",""))
print(score_dict)
if score_dict['Score'] == "yes":
matched_relevant_docs.append(d)

You are a grader assessing relevance of a retrieved document to a user question. 

Here is the retrieved document:

!--Copyright 2020 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the

⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.

-->

# How to create a custom pipeline?

In this guide, we will see how to create a custom pipeline and share it on the [Hub](hf.co/models) or add it to the
🤗 Transformers library.

First and foremost, you need to decide the raw entries the pipeline will be able to take. It can be strings, raw bytes,
dictionaries or whatever seems to be the most likely desired input. Try to keep these inputs as pure Python as possible
as it makes compatibility easier (even through other languages via JSON). Those will be the `inputs` of the
pipeline (`preprocess`).

Then define the `outputs`. Same policy as the `inputs`. The simpler, the better. Those will be the outputs of
`postprocess` method.

Start by inheriting the base class `Pipeline` with the 4 methods needed to implement `preprocess`,
`_forward`, `postprocess`, and `_sanitize_parameters`.


```python
from transformers import Pipeline


Here is the user question: How to create a pipeline object?

If the document contains keywords related to the user question, grade it as relevant.

It does not need to be a stringent test. The goal is to filter out erroneous retrievals.

Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.

The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":

```json
{
"Score": string // score for the context query relevancy
}
```


```json
{
"Score": "yes"
}
```
{'Score': 'yes'}
You are a grader assessing relevance of a retrieved document to a user question.

Here is the retrieved document:

```
</tf>
</frameworkcontent>

## Pipeline

<Youtube id="tiZFewofSLM"/>

The [`pipeline`] is the easiest and fastest way to use a pretrained model for inference. You can use the [`pipeline`] out-of-the-box for many tasks across different modalities, some of which are shown in the table below:

<Tip>

For a complete list of available tasks, check out the [pipeline API reference](./main_classes/pipelines).

</Tip>


Here is the user question: How to create a pipeline object?

If the document contains keywords related to the user question, grade it as relevant.

It does not need to be a stringent test. The goal is to filter out erroneous retrievals.

Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.

The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":

```json
{
"Score": string // score for the context query relevancy
}
```


```json
{
"Score": "yes"
}
```
{'Score': 'yes'}
You are a grader assessing relevance of a retrieved document to a user question.

Here is the retrieved document:

- **Self-contained**: A pipeline shall be as self-contained as possible. More specifically, this means that all functionality should be either directly defined in the pipeline file itself, should be inherited from (and only from) the [`DiffusionPipeline` class](https://github.com/huggingface/diffusers/blob/5cbed8e0d157f65d3ddc2420dfd09f2df630e978/src/diffusers/pipeline_utils.py#L56) or be directly attached to the model and scheduler components of the pipeline.
- **Easy-to-use**: Pipelines should be extremely easy to use - one should be able to load the pipeline and
use it for its designated task, *e.g.* text-to-image generation, in just a couple of lines of code. Most
logic including pre-processing, an unrolled diffusion loop, and post-processing should all happen inside the `__call__` method.
- **Easy-to-tweak**: Certain pipelines will not be able to handle all use cases and tasks that you might like them to. If you want to use a certain pipeline for a specific use case that is not yet supported, you might have to copy the pipeline file and tweak the code to your needs. We try to make the pipeline code as readable as possible so that each part –from pre-processing to diffusing to post-processing– can easily be adapted. If you would like the community to benefit from your customized pipeline, we would love to see a contribution to our [community-examples](https://github.com/huggingface/diffusers/tree/main/examples/community). If you feel that an important pipeline should be part of the official pipelines but isn't, a contribution to the [official pipelines](https://github.com/huggingface/diffusers/blob/main/src/diffusers/pipelines) would be even better.


Here is the user question: How to create a pipeline object?

If the document contains keywords related to the user question, grade it as relevant.

It does not need to be a stringent test. The goal is to filter out erroneous retrievals.

Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.

The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":

```json
{
"Score": string // score for the context query relevancy
}
```


```json
{
"Score": "yes"
}
```
{'Score': 'yes'}

Redact an answer based on the documents scored as yes by the llm- basically implementing Corrective RAG.

retrieved_docs_text = [doc for doc in matched_relevant_docs]  # we only need the text of the documents
context = "\nExtracted documents:\n"
context += "".join([f"Document {str(i)}:::\n" + doc for i, doc in enumerate(retrieved_docs_text)])

final_prompt = RAG_PROMPT_TEMPLATE.format(question="How to create a pipeline object?", context=context)

print(final_prompt)

Response Log

<|system|>
Using the information contained in the context,
give a comprehensive answer to the question.
Respond only to the question asked, response should be concise and relevant to the question.
Provide the number of the source document when relevant.
If the answer cannot be deduced from the context, do not give an answer.</s>
<|user|>
Context:

Extracted documents:
Document 0:::
!--Copyright 2020 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the

⚠️ Note that this file is in Markdown but contain specific syntax for our doc-builder (similar to MDX) that may not be
rendered properly in your Markdown viewer.

-->

# How to create a custom pipeline?

In this guide, we will see how to create a custom pipeline and share it on the [Hub](hf.co/models) or add it to the
🤗 Transformers library.

First and foremost, you need to decide the raw entries the pipeline will be able to take. It can be strings, raw bytes,
dictionaries or whatever seems to be the most likely desired input. Try to keep these inputs as pure Python as possible
as it makes compatibility easier (even through other languages via JSON). Those will be the `inputs` of the
pipeline (`preprocess`).

Then define the `outputs`. Same policy as the `inputs`. The simpler, the better. Those will be the outputs of
`postprocess` method.

Start by inheriting the base class `Pipeline` with the 4 methods needed to implement `preprocess`,
`_forward`, `postprocess`, and `_sanitize_parameters`.


```python
from transformers import PipelineDocument 1:::
```
</tf>
</frameworkcontent>

## Pipeline

<Youtube id="tiZFewofSLM"/>

The [`pipeline`] is the easiest and fastest way to use a pretrained model for inference. You can use the [`pipeline`] out-of-the-box for many tasks across different modalities, some of which are shown in the table below:

<Tip>

For a complete list of available tasks, check out the [pipeline API reference](./main_classes/pipelines).

</Tip>Document 2:::
- **Self-contained**: A pipeline shall be as self-contained as possible. More specifically, this means that all functionality should be either directly defined in the pipeline file itself, should be inherited from (and only from) the [`DiffusionPipeline` class](https://github.com/huggingface/diffusers/blob/5cbed8e0d157f65d3ddc2420dfd09f2df630e978/src/diffusers/pipeline_utils.py#L56) or be directly attached to the model and scheduler components of the pipeline.
- **Easy-to-use**: Pipelines should be extremely easy to use - one should be able to load the pipeline and
use it for its designated task, *e.g.* text-to-image generation, in just a couple of lines of code. Most
logic including pre-processing, an unrolled diffusion loop, and post-processing should all happen inside the `__call__` method.
- **Easy-to-tweak**: Certain pipelines will not be able to handle all use cases and tasks that you might like them to. If you want to use a certain pipeline for a specific use case that is not yet supported, you might have to copy the pipeline file and tweak the code to your needs. We try to make the pipeline code as readable as possible so that each part –from pre-processing to diffusing to post-processing– can easily be adapted. If you would like the community to benefit from your customized pipeline, we would love to see a contribution to our [community-examples](https://github.com/huggingface/diffusers/tree/main/examples/community). If you feel that an important pipeline should be part of the official pipelines but isn't, a contribution to the [official pipelines](https://github.com/huggingface/diffusers/blob/main/src/diffusers/pipelines) would be even better.
---
Now here is the question you need to answer.

Question: How to create a pipeline object?</s>
<|assistant|>

Response Synthesizer

answer = llm(final_prompt)
print(f"Response Synthesized by LLM :\n\n{answer}")

Response Log

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.
Response Synthesized by LLM :

To create a pipeline object following the guidelines provided, you can follow these steps:

1. Define the inputs and outputs of your pipeline. These could be strings, dictionaries, or any other Python data type that best represents the input and output formats for your specific task.

2. Inherit the `Pipeline` class from the `transformers` module. This class provides the necessary methods for implementing the pipeline.

3. Implement the four required methods: `preprocess`, `_forward`, `postprocess`, and `_sanitize_parameters`. Here's a brief explanation of what each method does:

- `preprocess(self, inputs: Dict[str, Any], return_dict: bool = True) -> Dict[str, Any]`: This method takes the input dictionary and returns a dictionary with the preprocessed data. It can also optionally return a dictionary instead of a list of outputs.

- `_forward(self, inputs: Dict[str, Any]) -> Dict[str, Any]`: This method performs the actual computation using the underlying model and scheduler. It takes the preprocessed input dictionary and returns a dictionary with the computed results.

- `postprocess(self, outputs: Dict[str, Any], **kwargs) -> Dict[str, Any]`: This method takes the computed outputs and returns the final output dictionary after applying any necessary postprocessing steps.

- `_sanitize_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]`: This method sanitizes the input parameters before passing them to the underlying model and scheduler. It ensures that the parameters are in the correct format and range.

4. Optionally, you can attach additional functionality to the model and scheduler components of the pipeline. This can include custom loss functions, optimizers, and learning rate schedules.

5. Once you have implemented your pipeline, you can load it using the `Pipeline` constructor and use it for inference. Here's an example usage:

```python
from my_pipeline import MyPipeline

# Load the pipeline
pipe = MyPipeline()

# Use the pipeline for inference
result = pipe("This is a sample input

Conclusion:

Here we have implemented a Corrective RAG approach aby only considering the context to be those documents which are relevant to the query asked. We could have also used external resources to enhance the correctness of the non relevant contexts and pass it on to the LLM to enhance the response.

Referrences:

connect with me

--

--