Riding On Chains with A Pinecone

Directory-based bulk data loading with Pinecone and LangChain

James Ahn-King
8 min readMay 9, 2024
DALL-E generated image of man riding a parrot(LangChain) with a pinecone(Pinecone DB) in his backpack

Large language models (LLMs) come packed with more intelligence than anyone could ever wish for, but they can be marginally helpful for bespoke use cases without help. Precision is king for any tool. For stochastic parrots, achieving that goal presents quite a few challenges. RAG allows us to increase our application accuracy through source document layering. The more context, the more accurate our applications can be.

In the real-world implementation of Retrieval Augmented Generation (RAG), our aim is to streamline the workflow for ourselves and our clients. The journey begins with their source documents, whose importance is critical to the process. A client may provide a variety of file types. So, the question becomes, how can we make loading and processing these documents more accessible and manageable?

This article will focus on bulk loading file-types into a Pinecone vector database from a directory, ingesting source documents, document processing, vector pricing, vector database initialization, chat, and tracing.

Github Repo:

Pinecone w/ LangChain Bulk Data Loaders (.pdf, .csv, and .docx):
https://github.com/jamesahnking/pinecone-langchain-dataloaders

Application and Workflow Steps

  1. Add a set of documents of the same file type to a corresponding directory.
  2. Verify the directory and file-types, combine documents and character split, also known as chunking, then return chunks for further use
  3. Check the price of embedding the chunks generated
  4. Generate a vector database or delete a Vector database
  5. Label and load chunks as embeddings into your database
  6. Create a conversational chatbot to interact with the new knowledge base
  7. Trace interactions to evaluate

Directory Structure

For this example, we will cover .docx file ingestion. We will bulk-process the documents from inside a .docx-titled directory. The directory structure should be as follows: the application file should be located at the root of the directory, and subsequent files should be added to the matching named folder.

├── pinecone-langchain-docx-dataloader.ipynb
├── docxs
│ ├── client_file_01.docx
│ ├── client_file_02.docx
│ ├── client_file_03.docx

Data Ingest and Chunking

In our opening code block, the check_and_load_docx_from_dir() function allows you to divide your documents by file extension. It covers directory validation, file-type checking, document list creation, and text splitting (chunking).

The function lets your application grab all the files in your .docx directory, ensuring your files have the correct extension. It then bundles them together and text splits the data to be converted into vectors for your db later. It uses LangChains Docx2txtLoader to manage the .docx processing and the always reliable LangChain RecursiveCharacterTextSplitter dependency to chop up the text strings.


import os
from langchain_community.document_loaders.word_document import Docx2txtLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter


def check_and_load_docx_from_dir(directory):
# Ensure the directory path exists
if not os.path.exists(directory):
print("Directory does not exist.")
return False

# Check if directory is actually a directory
if not os.path.isdir(directory):
print("The specified path is not a directory.")
return False

# List all files in the directory
all_files = os.listdir(directory)
print(f'Heres the list of all .docx files within the {directory} directory: {all_files}')

# Check if each file ends with .docx
for file in all_files:
if not file.endswith(".docx"):
print(f"Non-DOCX file found: {file}")
return False

# load directory path
directory_path = directory
# add list comprehension for file os.path.join usage
docx_files = [f for f in all_files if f.endswith(".docx")]
# Create emptly list container
documents = []
# Loop through the directory, bundle and load docx files.
for docx_file in docx_files:
file_path = os.path.join(directory_path, docx_file)
loader = Docx2txtLoader(file_path)
documents.extend(loader.load())

# Split text
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Maximum size of each chunk
chunk_overlap=100, # Number of overlapping characters between chunks
)

# Create empty list to hold chunks
chunks = []

# Split and add to the list
for document in documents:
chunks.extend(text_splitter.split_documents([document]))

return chunks

chunks = check_and_load_docx_from_dir('docxs')

chunks


# Usage example
# directory_path = 'docxs'
# check_and_load_docx_from_dir(directory_path)

Price Check

API calls cost money and are calculated by the token. When we split text strings, we convert them from our source documents into LLM-readable tokens. We always want to stay within budget, so price checking is recommended. Tiktoken is a tool we leverage to help calculate prices. It’s a ‘tokenizer’. Its function is to analyze chunked text which we use to determine value. If checking the price isn’t an issue for you, skipping this step wont interfere with the apps function.

For the following steps, we pull this function and a few additional fragments from Atef Ataya’s repo, “Building Question Answering Application using OpenAI, Pinecone, and LangChain.”

# How much it costs to embed
def calculate_and_display_embedding_cost(texts):
import tiktoken
enccoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
total_tokens = sum([len(enccoding.encode(page.page_content)) for page in texts])
print(f'Token Amount: {total_tokens}')
print(f'Embedding Cost in USD:{total_tokens / 1000 * 0.0004:.6f}')

calculate_and_display_embedding_cost(chunks)

Pinecone Index Creation and Deletion

Pinecone refers to its databases as indexes. This code block has two functions: delete_index_with_same_name(), which deletes a Pinecone database. The second, load_or_create_embeddings_index(), can create a new Pinecone index and add embeddings, or it will load data for an already created Pinecone index. This code block leverages LangChains’ PineconeVectorStore dependency, which works as an interface to manage indexes.

from pinecone import Pinecone, ServerlessSpec
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
import time

pc = Pinecone(
api_key=os.environ.get("PINECONE_API_KEY") or 'PINECONE_API_KEY'
)

def delete_index_with_same_name(index_name):

# Delete index if any indexes of the same name are present
if index_name in pc.list_indexes().names():
print(f'Deleting the {index_name} vector database')
pc.delete_index(index_name)


def load_or_create_embeddings_index(index_name, chunks, namespace):

if index_name in pc.list_indexes().names():
print(f'Index {index_name} already exists. Loading embeddings...', end='')
vector_store = PineconeVectorStore.from_documents(
documents=chunks, embedding=OpenAIEmbeddings(), index_name=index_name, namespace=namespace)

while not pc.describe_index(index_name).status['ready']:
time.sleep(1)

print('Done')
else:
print(f'Creating index {index_name} and embeddings ...', end = '')
pc.create_index(name=index_name, dimension=1536, metric='cosine', spec=ServerlessSpec(
cloud='aws',
region='us-west-2'
))

while not pc.describe_index(index_name).status['ready']:
time.sleep(1)
# Add to vectorDB using LangChain
vector_store = PineconeVectorStore.from_documents(
documents=chunks, embedding=OpenAIEmbeddings(), index_name=index_name, namespace=namespace)
print('Done')
return vector_store

Create your Pinecone Index with Embeddings

We create our Pinecone index with embeddings by calling the load_or_create_embeddings_index() function with arguments. This creates our database, loads our data into the index, and adds a namespace to our data. A namespace in Pinecone allows you to section off records within an index into distinct segments. In this example, our namespace is docxs_documents. This allows us to query our index and limit our searches based on the title we give a batch of data. If the name of the index already exists, only the namespace and new data will be added.

index_name='docxs-dataloader-2'
chunks = chunks
namespace = "docxs_documents_2"

vector_store = load_or_create_embeddings_index(index_name=index_name, chunks=chunks, namespace=namespace)
Generated Vector Store from code block

Create a Conversation Chain with a Chat History

To test our updated data, we will need to give the application chat functionality so that we can interact and test the new knowledge base. To accomplish this, we use two LCEL chain constructors and stack them. LCEL stands for LangChain Expression Language. LangChain describes LCEL as “a declarative way to easily compose chains together.”

That’s cool… but what’s a chain?

A chain is a series of linked operations involving the LLM that drive LangChains workflow on your interactions and data. A chain will involve multiple components like prompt templates, LLMs (Large Language Models), output parsers, memory modules, and prebuilt or custom tools to execute a sequence of actions. Each link in the workflow is a link in a chain. We use two LCEL chain constructors for this function, create_retrieval_chain and create_history_aware_retriever.

By combining them, we get the chat functionality we want. First, we have a working prompt that will incorporate chat history and the latest question into a standalone question, plus context for answering that question. Second, we have a question-and-answer assistant to whom we can provide instructions and a way to combine all of this functionality into one workflow.

from langsmith import traceable
from openai import Client

openai = Client()


@traceable(
run_type="llm",
project_name="docxs-dataloader-2"
)
def create_history_aware_retriever_with_hub(vector_store, question, chat_history=[]):
from langchain_community.chat_models import ChatOpenAI
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_history_aware_retriever
from langchain.chains import create_retrieval_chain
from langchain.prompts import ChatPromptTemplate
from langchain_core.prompts import MessagesPlaceholder
from langchain import hub

# https://smith.langchain.com/hub/langchain-ai/retrieval-qa-chat?organizationId=80b05ae8-a524-5b5f-b4d7-36207f821772
rephrase_prompt = hub.pull("langchain-ai/chat-langchain-rephrase")

llm = ChatOpenAI(temperature=1)

# Grab your Pinecone and set the search type
retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k':3})

# Chain 1
chat_retriever_chain = create_history_aware_retriever(
llm, retriever, rephrase_prompt
)

qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}"""

qa_prompt = ChatPromptTemplate.from_messages(
[
("system", qa_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
]
)
# Chain 2
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
# combine both chains
rag_chain = create_retrieval_chain(chat_retriever_chain, question_answer_chain)
# invoke the chain to get a response
result = rag_chain.invoke({"input": question, "chat_history": chat_history })
# Append to Chat History
chat_history.append((question, result['answer']))

return result, chat_history

Set up LangSmith Trace

To evaluate our application. We’ll use LangChains’ LangSmith, which allows us to trace and assess its thought process. LangSmith records the app’s steps to return an answer. This will enable you to more precisely review and evaluate the app’s performance. LangSmith Website

# LangSmith Trace
import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file

LANGCHAIN_TRACING_V2 = 'true'
LANGCHAIN_ENDPOINT = 'https://api.smith.langchain.com'
LANGCHAIN_API_KEY = os.environ['LANGCHAIN_API_KEY']
LANGCHAIN_PROJECT = os.environ['LANGCHAIN_PROJECT']

Chat and Trace

Lastly, it’s time to chat with our data. When a response is incorrect, we can review the output with LangSmith. Often, the reason behind an incorrect response will be context, and to solve it, you would need to load additional documents into the database to generate the desired answer. This is a trial-and-error process, but it is worth the effort.

chat_history = []
question = "Does Trasaterra do marketing?"
result, chat_history = create_history_aware_retriever_with_hub(vector_store, question,chat_history)
print(result['answer'])
print(chat_history)

No, Trasaterra is not a marketing company. They specialize in branding, comprehensive design services, web design, and development. Content creation can be included as part of their offerings, but it is not a core service.
[('Does Trasaterra do marketing?', 'No, Trasaterra is not a marketing company. They specialize in branding, comprehensive design services, web design, and development. Content creation can be included as part of their offerings, but it is not a core service.')]
LangSmith Trace

Conclusion

Most non-technical people understand the concept of document layering to increase accuracy. Automating the strategy is a win for both sides of the process since it’s inherently iterative. Directory-based automation makes things more accessible, and we should continuously strive toward accessible workflows.

Here are some resources that I found helpful for my builds.

Github Repo:

Pinecone w/ LangChain Bulk Data Loaders (.pdf, .csv, and .docx):
https://github.com/jamesahnking/pinecone-langchain-dataloaders

James Ahn-King is a full-stack developer at the branding, design and web agency Trasaterra in New York City. Trasaterra’s creative philosophy and methodology is rooted in audience attraction

--

--