Asking Questions to Your Own Documents with Snowflake Cortex

Understanding and analyzing unstructured data with the help of Large Language Models (LLMs) has become a very hot topic. Almost every conversation I have with partners and customers, this is coming as an immediate use case. Understand your own legal documents and generate new documentation based on those, RFP generation based on your RFP library of documents, questioning your own documents to extract information and many others. Snowflake enabled all those in an easy way with Snowflake Cortex.

Two of the key technology components here are Embeddings that provide the semantic meaning of the text and the capability to store those as Vectors within your database so you can look for similar Vectors in the future. My colleague Tom Christian has created a great blog and code where he explains how to Build End-to-End RAG Applications with Snowflake Cortex. He explains the full process and the advantages of using Snowflake Cortex creating embeddings and using them for Retrieval Augmented Generation (RAG). Some of that code is reused here for this example.

Because Snowpark Container Services enables running anything you can put in a container within Snowflake, you can also host your own vector databases and embeddings in case you do not want to use Cortex. My colleague Bart Wrobel also wrote a blog entry about using Leveraging Snowpark Container Services for Advanced Q&A.

In this blog entry I want to go deeper into the usage of Snowflake Cortex to review again how simple it is and how your answers will dramatically improve when using the right context. We will also explore how we can integrate it all with Streams and Tasks so documents are automatically processed once made available to Snowflake. This is a complete quickstart guide where you can implement this yourself.

This is a very simplistic approach as we will be just getting one chunk to provide context but the goal is to prove how powerful this can be and how easy it is to implement within Snowflake.

There is one staging area where we upload our documents. On top of that staging area there is a Directory Table that provides the file-level metadata. To track changes happening in that Directory Table, we create one Stream that provides the changes and one Task that will process all new files being added. The task will invoice a function to read the PDF files, chunk it on pieces of text and then create embeddings for each chunk, storing it within Snowflake using the new Vector data type (in private preview).

The next step will be using those embeddings to answer and locate documents relevant for the users. The user will be asking questions that will be embedded looking for similar content within our documents. That content will be added to the prompt as context that will be passed to the LLM with the question. Also, using scoped URLs a link to the document used will be provided, so the user can examine the document. Let’s explain that diagram step by step:

Step 1:

To explain the importance of using my own data, here I am going to upload 3 of my phone bills into the staging area. By the way, those bills are in Spanish.

Here I have created on Streamlit in Snowflake App to interact with the data. Just get that code and paste within Streamlit in your own Snowflake account

Step 2:

In the App, I can see that there are 3 new files in the Stream waiting to be processed:

The code is quite simple. First I have defined a stream on the directory table containing the docs:

create or replace stream docs_stream on stage docs;

And then define a task that will process the new documents in that stream, calling the function that return the chunks and use them to create embeddings that are stored as vectors:

create or replace task task_extract_chunk_vec_from_pdf 
warehouse = XS_WH
schedule = '1 minute'
when system$stream_has_data('docs_stream')
as

insert into docs_chunks_table (relative_path, size, file_url,
scoped_file_url, chunk, chunk_vec)
select relative_path,
size,
file_url,
build_scoped_file_url(@docs, relative_path) as scoped_file_url,
func.chunk as chunk,
snowflake.cortex.embed_text('e5-base-v2',chunk) as chunk_vec
from
docs_stream,
TABLE(pdf_text_chunker(build_scoped_file_url(@docs, relative_path))) as func;

alter task task_extract_chunk_vec_from_pdf resume;

That task is inserting into the docs_chunks_table the PDF content by calling a UDTF called pdf_text_chunker and then for each chunk is calling the embed_text Cortex function to generate the vectors for each chunk of text.

Step 3:

Here we use a single UDTF that reads the PDF file and creates the chunks. This will be helpful for large PDF files.

from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging

class pdf_text_chunker:

def read_pdf(self, file_url: str) -> str:

logger = logging.getLogger("udf_logger")
logger.info(f"Opening file {file_url}")

with SnowflakeFile.open(file_url, 'rb') as f:
buffer = io.BytesIO(f.readall())

reader = PyPDF2.PdfReader(buffer)
text = ""
for page in reader.pages:
try:
text += page.extract_text().replace('\n', ' ')
except:
text = "Unable to Extract"
logger.warn(f"Unable to extract from file {file_url}, page {page}")

return text

def process(self,file_url: str):

text = self.read_pdf(file_url)

text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 3500, #Adjust this as you see fit
chunk_overlap = 200, #This let's text have some form of overlap. Useful for keeping chunks contextual
length_function = len
)

chunks = text_splitter.split_text(text)
df = pd.DataFrame(chunks, columns=['chunks'])

yield from df.itertuples(index=False, name=None)

And as Tom explained in his blog, a UDTF is registered and it can be called for each PDF url:

schema = StructType([
StructField("chunk", StringType())
])

session.udtf.register(
pdf_text_chunker,
output_schema= schema,
input_types = [StringType()] ,
is_permanent = True ,
name = 'pdf_text_chunker' ,
replace = True ,
packages=['snowflake-snowpark-python', 'pypdf2','pandas','langchain'],
stage_location = 'CC_DOCUMENTS.DATA2.UDF'
)

Step 4:

As explained before, the task is inserting the chunks and the embeddings into the Snowflake table. Embeddings are created with this call for each chunk:

            snowflake.cortex.embed_text('e5-base-v2',chunk) as chunk_vec

Step 5:

Here we have the table where we have inserted the chunks and their embeddings:

Step 6:

Until now we have seen the normal processing of new documents and how that information is added into Snowflake. Now we are going to see the effect of using that information when providing answers to user questions.

It is clear that the LLM knows nothing about me and my phone bills, so if we do not provide the right context, the answer will be useless. The App has a click where we can unselect to not use any context got from the documents. Let’s see what is the answer:

So clearly the LLM cannot provide any useful information as we are not providing access to my own documents.

Step 7:

In order to find chunks of text with similar meaning to the question being asked, the question is converted into vectors and then compared with other chunks. This is the piece of code that find the most similar chunk and the path to the file containing that chunk. My question is embedded and then search for the most similar vector using VECTOR_COSINE_DISTANCE():

        with results as
(SELECT RELATIVE_PATH,
VECTOR_COSINE_DISTANCE(docs_chunks_table.chunk_vec, query_vec.qvec) as distance,
chunk
from docs_chunks_table, query_vec
order by distance desc
limit {num_chunks})
select chunk, relative_path from results

Step 8:

Here we build the prompt and add the context we got from the most similar chunk of text. This is very simplistic as we are only using the firsts chunks but you get the idea. Also we provide the presigned URL to access the most relevant document:

def create_prompt (myquestion, rag):

if rag == 1:
createsql = f"""
create or replace table query_vec (qvec vector(float, 768))
"""
session.sql(createsql).collect()

insertsql = f"""
insert into query_vec
select snowflake.cortex.embed_text('e5-base-v2', '{myquestion}')
"""

session.sql(insertsql).collect()

cmd = f"""
with results as
(SELECT RELATIVE_PATH,
VECTOR_COSINE_DISTANCE(docs_chunks_table.chunk_vec, query_vec.qvec) as distance,
chunk
from docs_chunks_table, query_vec
order by distance desc
limit {num_chunks})
select chunk, relative_path from results
"""

df_context = session.sql(cmd).to_pandas()

context_lenght = len(df_context) -1

prompt_context = ""
for i in range (0, context_lenght):
prompt_context += df_context._get_value(i, 'CHUNK')
#st.text(prompt_context)

#prompt_context = df_context._get_value(0,'CHUNK')
prompt_context = prompt_context.replace("'", "")
relative_path = df_context._get_value(0,'RELATIVE_PATH')

prompt = f"""
'You are an expert assistance extracting information from context provided.
Answer the question based on the context. Be concise and do not hallucinate.
If you don´t have the information just say so.
Context: {prompt_context}
Question:
{myquestion}
Answer: '
"""
cmd2 = f"select GET_PRESIGNED_URL(@docs, '{relative_path}', 360) as URL_LINK from directory(@docs)"
df_url_link = session.sql(cmd2).to_pandas()
url_link = df_url_link._get_value(0,'URL_LINK')

else:
prompt = f"""
'Question:
{myquestion}
Answer: '
"""
url_link = "None"
relative_path = "None"

return prompt, url_link, relative_path

Step 9:

With the prompt we have created we simply call the complete() LLM Cortex function:

def complete(myquestion, model_name, rag = 1):

prompt, url_link, relative_path =create_prompt (myquestion, rag)
cmd = f"""
select snowflake.cortex.complete(
'{model_name}',
{prompt})
as response
"""

df_response = session.sql(cmd).collect()
return df_response, url_link, relative_path

def display_response (question, model, rag=0):
response, url_link, relative_path = complete(question, model, rag)
res_text = response[0].RESPONSE
st.markdown(res_text)
if rag == 1:
display_url = f"Link to [{relative_path}]({url_link}) that may be useful"
st.markdown(display_url)

Step 10:

Here the output of the call to complete() is captured and presented to the user. Let’s see what the LLM says about my phone bills when using the right context:

So it looks like it has been able to find by bill from September and was able to extract the right amount of information. One thing is clear as noted by my colleague Michael Gorkow when discussing this, and it is that the LLM (llama2–70b-chat) in this case is not performing any mathematical task. It is just extractring this from the prompt and presenting it with a proper language (even translating a few things from Spanish to English).

What if I ask for October?

Looks like using the semantic search it is also able to locate and extract the correct information

Step 10:

As you can notice in the screenshot, the answer also provides a link to the most relevant document used to answer the question. I can just click there and get access to that document.

Conclusion:

This is a very simple demo, but something I was able to build in a few minutes and completely hosted within Snowflake. So here, my bills are secured within the Snowflake security perimeter and my data is not leaving the platform. I do not care too much about my bills but I am pretty sure legal departments care a lot about their documents, so security will be relevant for them. Any other industry will also want to secure the access to their documentation, but at the same time, being able to extract value.

Snowflake Cortex provides amazing functionality to get all the power of Embeddings and LLMs to start capturing value out of unstructured data. Now think what you can do if you even mix that with all other sources of data that you have within your Snowflake Data Cloud!

And a last conclusion is that maybe I am paying too much for my phone bill?

Enjoy!

Carlos.-

Link to Quickstart Guide

--

--