Question and Answer Chat apps with our own data (PDF) with VertexAI, Langchain, Streamlit

Johanes Glenn
Google Cloud - Community
6 min readOct 16, 2023

--

Background

To explore more about embeddings now I inspired to create a simple chat app that able to answer based on my own data (of course in this case I want to borrow real pdfs from trusted source). This is part of my learnings of GenAI which start with a simple chat app then chat app with additional agents in langchain and now how if I want the apps response came from specific data that I have. This story focus on a simple concept in capturing data from source (pdf this time) then create embeddings and later allow the chat apps to actually query those data and use it as a response.

Notes: this is not a guide to create apps and intended only to explore a concept

Concept

The overall idea is to create a flow that Admin or trusted source able to upload PDFs to Object Storage (Google Cloud Storage). Then each time new file is uploaded the flow continue and create a trigger that actually ask a service to create embeddings from the particular file then store it into vector database (CloudSQL Postgresql — PGVector). In a separate service users can actually ask questions and responses will be sent based on the data stored in the vector database.

What is and how embeddings actually help us in this case ? I love how the docs mention about:

When you create text embeddings, you get vector representations of natural text as arrays of floating point numbers. What this means, is that all of your input text is assigned a numerical representation. By comparing the numerical distance between the vector representations of two pieces of text, an application can determine the similarity between the text or the objects represented by the text. https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings (12 Oct 2023)

With the calculation we can actually retrieve the pieces of information that related with the prompt form the user. It is quite complex (at least for me) at this stage however with the help of Langchain and also PGVector we may start with something simple then iterate for more detailed configurations.

Steps

[One] Create Google Cloud Storage bucket to store the PDFs. Ref: https://cloud.google.com/storage/docs/creating-buckets

[Two] Create CloudSQL Postgresql and then later on activate vector extension.
https://cloud.google.com/sql/docs/postgres/create-instance
https://cloud.google.com/blog/products/databases/using-pgvector-llms-and-langchain-with-google-cloud-databases

CREATE EXTENSION IF NOT EXISTS vector;

Develop and Create service to run the embeddings function. In this case I will deploy to CloudRun for simplification and of course low costs for demo. In this development mode I will use Cloud Workstations to ease development, especially testing the private connection to CloudSQL Postgresql. However feel free to use any tools that are within your preferences.

#Dockerfile
FROM python:3.10-slim

ENV PYTHONUNBUFFERED True

ENV APP_HOME /app
WORKDIR $APP_HOME

RUN apt-get update && apt-get install -y \
build-essential \
curl \
software-properties-common \
git \
&& rm -rf /var/lib/apt/lists/*

RUN apt-get update \
&& apt-get -y install libpq-dev gcc \
&& pip install psycopg2

COPY . ./

RUN pip install --no-cache-dir -r requirements.txt
EXPOSE 8080
CMD exec gunicorn --bind 0.0.0.0:8080 --workers 1 --threads 8 --timeout 0 embed:app
#cloudbuild.yaml (if needed)
steps:
# Build the container image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'asia-southeast1-docker.pkg.dev/<project id>/<artifact registry>/gcs-embed:latest', '.']
# Push the container image to Container Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'asia-southeast1-docker.pkg.dev/<project id>/<artifact registry>/gcs-embed:latest']
images:
- asia-southeast1-docker.pkg.dev/<project id>/<artifact registry>/gcs-embed:latest
logsBucket: '<bucket>'
options:
logging: GCS_ONLY
#embed.py
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain.chains import RetrievalQA
#from langchain.vectorstores import Chroma
from langchain_community.vectorstores.pgvector import PGVector
from langchain_community.document_loaders import GCSDirectoryLoader
from langchain_community.llms import VertexAI
from langchain_community.embeddings import VertexAIEmbeddings
from flask import Flask
import os

llm = VertexAI(
model_name='text-bison@002',
max_output_tokens=256,
temperature=0.1,
top_p=0.8,top_k=40,
verbose=True,
)

app = Flask(__name__)

@app.route('/', methods = ['POST', 'GET'])
def embed():
REQUESTS_PER_MINUTE = 150
embedding = VertexAIEmbeddings(requests_per_minute=REQUESTS_PER_MINUTE)

# load document
loader = GCSDirectoryLoader(project_name="<project id>", bucket="<bucket id>")
documents = loader.load()

text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
texts = text_splitter.split_documents(documents)

print(len(texts))

embeddings = VertexAIEmbeddings(requests_per_minute=REQUESTS_PER_MINUTE)

#changing to more programatically conn string
CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver=os.environ.get("PGVECTOR_DRIVER", "psycopg2"),
host=os.environ.get("PGVECTOR_HOST", "<ip>"),
port=int(os.environ.get("PGVECTOR_PORT", "5432")),
database=os.environ.get("PGVECTOR_DATABASE", "<db name>"),
user=os.environ.get("PGVECTOR_USER", "<username>"),
password=os.environ.get("PGVECTOR_PASSWORD", "<password>"),
)
COLLECTION_NAME = 'test_collection'

db = PGVector.from_documents(
embedding=embeddings,
documents=texts,
connection_string=CONNECTION_STRING,
collection_name=COLLECTION_NAME,
)

return 'done embed'

if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

Prior to deploying to runtime we can actually try the code within Cloud Workstation as it is attached within VPC.

Somehow it is running (notes: as this is only for test, so my objective is to check whether it is running)

What we need to do is push into CloudBuild and run the steps, either set trigger from repo or directly run the command.

gcloud builds submit .
It is running !

[Three] Create EventArc to push events each time a file is uploaded.

Ref: https://cloud.google.com/eventarc/docs/run/create-trigger-storage-console

[Four] As an additional stage I want to ensure the message of the event from eventarc delivered correctly (as the service may require some times before return a response which may cause the eventArc retry the push message) so I add one layer of Workflows that in the future may help me to run multiple steps within the flow.

Ref: https://cloud.google.com/workflows/docs/overview

[Five] Lets test by uploading the file and check the embeddings

I am testing using Alphabet Q4 & Fiscal Year earnings docs https://abc.xyz/investor/
embeddings is stored within the PGVector

Now to revisit the concept I already upload the files and trigger embeddings response then store them to CloudSQL Postgresql (PGVector). Next step is to create a client which the user can interact with.

[Six] Now in a different service I can deploy the user facing chat app, which will use CloudRun to serve the service.

#app.py
import streamlit as st
from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import VertexAIEmbeddings
from langchain.vectorstores.pgvector import PGVector
from langchain.chains import RetrievalQA
from langchain.llms import VertexAI

REQUESTS_PER_MINUTE = 150

st.title("📝 Q&A PDF with VertexAI")
with st.spinner('Wait for it...'):

llm = VertexAI(
model_name='text-bison@001',
max_output_tokens=256,
temperature=0.1,
top_p=0.8,top_k=40,
verbose=True,
)

embeddings = VertexAIEmbeddings(requests_per_minute=REQUESTS_PER_MINUTE)
CONNECTION_STRING = "postgresql+psycopg2://::5432/"
COLLECTION_NAME = 'test_collection'

db = PGVector.from_existing_index(
embedding=embeddings,
connection_string=CONNECTION_STRING,
collection_name=COLLECTION_NAME,
)

st.success('Chat is ready')

if "messages" not in st.session_state:
st.session_state["messages"] = [{"role": "assistant", "content": "How I can help you?"}]

for msg in st.session_state.messages:
st.chat_message(msg["role"]).write(msg["content"])

if prompt := st.chat_input():

retriever = db.as_retriever(search_type="similarity", search_kwargs={"k":2})

qa = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True, verbose=True)

st.session_state.messages.append({"role": "user", "content": prompt})
st.chat_message("user").write(prompt)
result = qa({"query": prompt})

st.session_state.messages.append({"role": "assistant", "content": result["result"]})
st.chat_message("assistant").write(result["result"])

These are the simple concepts on how I can create an app that is able to return based on specific data for grounding in GenAI using VertexAI. As this is only for a concept I haven’t created any document management which will try to see how I am able to create one quickly and create embeddings based on each object (files) on GCS. If you have any reference or feedback please do share with me :)

--

--

Johanes Glenn
Google Cloud - Community

Cloud Customer Engineer — Infrastructure Modernization @GoogleCloud. Stories are my own opinion. https://linktr.ee/alevz