Integrate LLM with Snowflake and Streamlit

Luiz
Blue Orange Digital
7 min readJan 8, 2024
Integration between LLM (ChatGPT) with Snowflake

Snowflake is recognized as the leading platform for facilitating seamless data collaboration. Recent advancements in the platform will empower financial analysts to enhance their data capabilities by leveraging the potential of LLMs, generative AI, and Streamlit.

Take an example of a financial analyst, that is responsible for scrutinizing financial data and economic indicators to formulate recommendations for investment strategies. Queries such as “Which states witnessed the highest number of bank merger and acquisition events in the last five years?” or “What regions of the country bear the brunt of inflation?” demand intricate SQL queries and the development of visualizations to convey findings effectively.

As is customary, stakeholders may seek modifications to the analysis or pose additional inquiries necessitating exploration. Each query begets a cascade of further questions, entailing more queries, code, and communication via platforms like Slack. This well-known and often exasperating cycle presents a promising opportunity to leverage the capabilities of LLMs.

Essentially, it is possible to equip a chatbot to respond to inquiries regarding the data housed in Snowflake! This removes the hurdle that has traditionally kept non-technical business users from accessing valuable insights within Snowflake data. Now, anyone with the ability to pose a question can engage with and gain insights from their data without the need to write a single SQL query.

In many respects, this interactive chatbot, adept in SQL, surpasses the conventional dashboard. Instead of laboriously adjusting dashboard architecture and logic to address a business user’s specific questions, that time can be invested in crafting a chatbot capable of addressing an unlimited range of inquiries.

Conda environment

To showcase an integration between LLM (ChatGPT) with Snowflake data warehouse. Using, in particular, a publicly available database of financial time series. Working on some prompt engineering to specialize the AI model to be able to generate SQL queries. Not only generate the queries but also execute them against the referred database. All of this is done using a simple interface generated on Streamlit.

A very simple deployment, running any computing engine, even local machines, using for instance, mini conda.

  • Install conda
  • Create conda environment.
conda create --name bod-snow-llm -c https://repo.anaconda.com/pkgs/snowflake python=3.10
  • Activate environment.
conda activate bod-snow-llm
  • Install libs: snowpark, openai, streamlit
conda install -c https://repo.anaconda.com/pkgs/snowflake snowflake-snowpark-python openai 
pip install streamlit

Snowflake

-- Create a dedicated database and schema for the new views
CREATE DATABASE IF NOT EXISTS FROSTY_SAMPLE;
CREATE SCHEMA IF NOT EXISTS FROSTY_SAMPLE.CYBERSYN_FINANCIAL;

-- Create the limited attributes view
CREATE VIEW IF NOT EXISTS FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ATTRIBUTES_LIMITED AS
SELECT * from cybersyn_financial__economic_essentials.cybersyn.financial_institution_attributes
WHERE VARIABLE IN (
'ASSET',
'ESTINS',
'LNRE',
'DEP',
-- 'NUMEMP', -- This seems to be multiplied by 1000x in the data set, which is awkward. Excluding for now.
'SC'
);

-- Confirm the view was created correctly - should show 6 rows with variable name and definition
SELECT * FROM FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ATTRIBUTES_LIMITED;

-- Create the modified time series view
CREATE VIEW IF NOT EXISTS FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ANNUAL_TIME_SERIES AS
SELECT ent.name as entity_name, ent.city, ent.state_abbreviation,
ts.variable_name, year(ts.date) as "YEAR", to_double(ts.value) as value, ts.unit, att.definition
FROM cybersyn_financial__economic_essentials.cybersyn.financial_institution_timeseries AS ts
INNER JOIN FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ATTRIBUTES_LIMITED att ON (ts.variable = att.variable)
INNER JOIN cybersyn_financial__economic_essentials.cybersyn.financial_institution_entities AS ent ON (ts.id_rssd = ent.id_rssd)
WHERE MONTH(date) = 12 AND DAY(date) = 31;

-- Confirm the view was created correctly and view sample data
select * from FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ANNUAL_TIME_SERIES limit 10;

Streamlit

  • Create a dir: llm-chatbot
  • Create a sub dir: .streamlit
  • Create a file: secrets.toml
# .streamlit/secrets.toml
OPENAI_API_KEY = "sk-2v...X"
[connections.snowpark]
user = "<jdoe>"
password = "<my_trial_pass>"
warehouse = "COMPUTE_WH"
role = "ACCOUNTADMIN"
account = "<account-id>"
  • (Optional) Validate credentials with ChatGPT: streamlit run val_cred_gpt.py
import streamlit as st
import openai
openai.api_key = st.secrets["OPENAI_API_KEY"]
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "What is Streamlit?"}
]
)
st.write(completion.choices[0].message.content)
  • (Optional) Validate credentials with Snowflake: streamlit run val_cred_snow.py
import streamlit as st
conn = st.experimental_connection("snowpark")
df = conn.query("select current_warehouse()")
st.write(df)
  • (Optional) Create a simple chat interface: streamlit run simple_chat.py
import openai
import streamlit as st
st.title("☃️ Frosty")
# Initialize the chat messages history
if "messages" not in st.session_state.keys():
st.session_state.messages = [
{"role": "assistant", "content": "How can I help?"}
]
# Prompt for user input and save
if prompt := st.chat_input():
st.session_state.messages.append({"role": "user", "content": prompt})
# display the existing chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
# If last message is not from assistant, we need to generate a new response
if st.session_state.messages[-1]["role"] != "assistant":
# Call LLM
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
r = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": m["role"], "content": m["content"]} for m in st.session_state.messages],
)
response = r.choices[0].message.content
st.write(response)
message = {"role": "assistant", "content": response}
st.session_state.messages.append(message)

Prompt Engineering

  • Prepare ChatGPT to work with the prompt: streamlit run prompts.py
import streamlit as st

QUALIFIED_TABLE_NAME = "FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ANNUAL_TIME_SERIES"
TABLE_DESCRIPTION = """
This table has various metrics for financial entities (also referred to as banks) since 1983.
The user may describe the entities interchangeably as banks, financial institutions, or financial entities.
"""
# This query is optional if running Frosty on your own table, especially a wide table.
# Since this is a deep table, it's useful to tell Frosty what variables are available.
# Similarly, if you have a table with semi-structured data (like JSON), it could be used to provide hints on available keys.
# If altering, you may also need to modify the formatting logic in get_table_context() below.
METADATA_QUERY = "SELECT VARIABLE_NAME, DEFINITION FROM FROSTY_SAMPLE.CYBERSYN_FINANCIAL.FINANCIAL_ENTITY_ATTRIBUTES_LIMITED;"

GEN_SQL = """
You will be acting as an AI Snowflake SQL Expert named Frosty.
Your goal is to give correct, executable sql query to users.
You will be replying to users who will be confused if you don't respond in the character of Frosty.
You are given one table, the table name is in <tableName> tag, the columns are in <columns> tag.
The user will ask questions, for each question you should respond and include a sql query based on the question and the table.

{context}

Here are 6 critical rules for the interaction you must abide:
<rules>
1. You MUST MUST wrap the generated sql code within ``` sql code markdown in this format e.g
```sql
(select 1) union (select 2)
```
2. If I don't tell you to find a limited set of results in the sql query or question, you MUST limit the number of responses to 10.
3. Text / string where clauses must be fuzzy match e.g ilike %keyword%
4. Make sure to generate a single snowflake sql code, not multiple.
5. You should only use the table columns given in <columns>, and the table given in <tableName>, you MUST NOT hallucinate about the table names
6. DO NOT put numerical at the very front of sql variable.
</rules>

Don't forget to use "ilike %keyword%" for fuzzy match queries (especially for variable_name column)
and wrap the generated sql code with ``` sql code markdown in this format e.g:
```sql
(select 1) union (select 2)
```

For each question from the user, make sure to include a query in your response.

Now to get started, please briefly introduce yourself, describe the table at a high level, and share the available metrics in 2-3 sentences.
Then provide 3 example questions using bullet points.
"""

@st.cache_data(show_spinner=False)
def get_table_context(table_name: str, table_description: str, metadata_query: str = None):
table = table_name.split(".")
conn = st.experimental_connection("snowpark")
columns = conn.query(f"""
SELECT COLUMN_NAME, DATA_TYPE FROM {table[0].upper()}.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{table[1].upper()}' AND TABLE_NAME = '{table[2].upper()}'
""",
)
columns = "\n".join(
[
f"- **{columns['COLUMN_NAME'][i]}**: {columns['DATA_TYPE'][i]}"
for i in range(len(columns["COLUMN_NAME"]))
]
)
context = f"""
Here is the table name <tableName> {'.'.join(table)} </tableName>

<tableDescription>{table_description}</tableDescription>

Here are the columns of the {'.'.join(table)}

<columns>\n\n{columns}\n\n</columns>
"""
if metadata_query:
metadata = conn.query(metadata_query)
metadata = "\n".join(
[
f"- **{metadata['VARIABLE_NAME'][i]}**: {metadata['DEFINITION'][i]}"
for i in range(len(metadata["VARIABLE_NAME"]))
]
)
context = context + f"\n\nAvailable variables by VARIABLE_NAME:\n\n{metadata}"
return context

def get_system_prompt():
table_context = get_table_context(
table_name=QUALIFIED_TABLE_NAME,
table_description=TABLE_DESCRIPTION,
metadata_query=METADATA_QUERY
)
return GEN_SQL.format(context=table_context)

# do `streamlit run prompts.py` to view the initial system prompt in a Streamlit app
if __name__ == "__main__":
st.header("System prompt for Frosty")
st.markdown(get_system_prompt())
  • Combine the Chat with the Prompt: streamlit run chat_app.py
import openai
import re
import streamlit as st
from prompts import get_system_prompt

st.title("☃️ Frosty")

# Initialize the chat messages history
openai.api_key = st.secrets.OPENAI_API_KEY
if "messages" not in st.session_state:
# system prompt includes table information, rules, and prompts the LLM to produce
# a welcome message to the user.
st.session_state.messages = [{"role": "system", "content": get_system_prompt()}]

# Prompt for user input and save
if prompt := st.chat_input():
st.session_state.messages.append({"role": "user", "content": prompt})

# display the existing chat messages
for message in st.session_state.messages:
if message["role"] == "system":
continue
with st.chat_message(message["role"]):
st.write(message["content"])
if "results" in message:
st.dataframe(message["results"])

# If last message is not from assistant, we need to generate a new response
if st.session_state.messages[-1]["role"] != "assistant":
with st.chat_message("assistant"):
response = ""
resp_container = st.empty()
for delta in openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": m["role"], "content": m["content"]} for m in st.session_state.messages],
stream=True,
):
response += delta.choices[0].delta.get("content", "")
resp_container.markdown(response)

message = {"role": "assistant", "content": response}
# Parse the response for a SQL query and execute if available
sql_match = re.search(r"```sql\n(.*)\n```", response, re.DOTALL)
if sql_match:
sql = sql_match.group(1)
conn = st.experimental_connection("snowpark")
message["results"] = conn.query(sql)
st.dataframe(message["results"])
st.session_state.messages.append(message)

What can be done next

With a bit of prompt engineering, you have the capability to develop a chat application powered by LLM using the tools integrated into Snowflake and Streamlit. However, this marks just the beginning of the potential that chat interfaces offer for data exploration and question-answering.

Consider expanding your application by updating it to operate with your confidential data in Snowflake or other relevant Snowflake Marketplace datasets. The table-specific logic in the app, specified at the top of prompts.py, makes it easy to swap and experiment with different scenarios.

Enhance the app’s capabilities by adding features such as using the LLM to choose from a set of available tables, summarizing the returned data, and writing Streamlit code to visualize the results. You might explore leveraging a library like LangChain to transform Frosty into an “Agent” with improved chain-of-thought reasoning and dynamic error handling.

Prepare for running the app in Streamlit within Snowflake (currently in Private Preview). The functionality demonstrated here is expected to be available in Streamlit within Snowflake, especially when paired with External Access (also in Private Preview), simplifying access to an external LLM. This integration promises a more seamless and powerful data exploration experience.

Additional Materials

Financial database from Snowflake Marketplace: Snowflake

Original material from Frosty: Build an LLM Chatbot in Streamlit on your Snowflake Data

--

--