CrystalCosts: Building an AI Agent for Cost Monitoring on Snowflake

Introduction: Warehouses and AI

Picture this: you’re knee-deep in data, drowning in hours of analysis, surrounded by all types of queries, and warehouse costs soaring high. Is this really the tech future we signed up for? We’d like to think no!

Enter AI agents. Agents that run on your warehouse optimize queries, fetch insights, and bring you what you need the way you need, all with minimal effort. Insights flow like a river, access is seamless, and what once took hours now takes seconds!

This is precisely what is possible today.

Here is a step-by-step guide for building one such agent we recently experimented with. We call it CrystalCosts — a conversational AI agent that helps you monitor and predict Snowflake warehouse costs!

CrystalCosts in action

CrystalCosts, our Agent in LLM armor

Let’s face it — managing data warehouses, especially costs, can be a pain. Manual tasks and endless SQL queries? No thanks! That’s why we built CrystalCosts. This cool tool makes cost monitoring easy and brings smarter AI interactions to your data warehouse.

Use it to monitor credit usage, address credit-related queries, and forecast expenses effortlessly through natural language interactions. CrystalCosts delivers comprehensive analyses, cross-warehouse credit comparisons, and visual forecasts, empowering informed decision-making with insightful data representations like bar and line charts.

DIY-Snowflake Cost Optimiser

Dive in to see how you can build this fantastic AI agent for your team (our GitHub Repository is attached at the end!).

So, what tools do you need?

To build an agent like CrystalCosts, you will need the following tools —

Snowflake

  • Arctic LLM: This is Snowflake’s LLM Model, to be used while creating the conversational agent.
  • Cortex: Used as a tool by Langchain, this primarily powers the Agent’s forecasting capabilities.
  • Data warehouse: This is the warehouse you want to optimize costs for.
  • Streamlit: For crafting the interactive web application that serves as the user interface.

Langchain: Use this to build a multi-agent system (or chain) that includes a conversational agent and an SQL agent.

Open AI’s GPT-4 Turbo: This will help the SQL Agent generate the correct queries, queue them in the data warehouse, and fetch the relevant data.

How all the tools and Agents interact in CrystalCosts

Let’s get you all set

Now that you know the tools we used, here is how to set up the correct environment for creating your agent.

These are the necessary steps towards building the right environment —

  • Step 1: Setup Python/pip if you don’t already have it installed
  • Step 2: Install the libraries needed likeStreamlit, Langchain, and more. You can download the requirements file from the Github Repo linked in the last section of this article, place it in your root project, and run the following:
pip install -r requirements.txt
  • Step 3: Ensure you have the proper credentials to access all of the Snowflake products mentioned above.
  • Note: For forecasting, you must create a view on top of the warehouse metering table.

Note: The functions discussed in the following three sections provide a high-level overview. For a more detailed and precise understanding of the exact code, please refer to our GitHub repository linked towards the end of this article.

Building the Agents

The CrystalCosts Agent has primarily 2 agents working behind the scenes to power all its capabilities. These are the SQL agent and the Conversation agent.

Let’s see the code you can use to build both —

1. The SQL Agent: This agent uses the GTP-4 Turbo model.

class SnowflakeAgent:
def __init__(self, llm, parser,db):
self.llm = llm
self.db=db
self.sql_toolkit = SQLDatabaseToolkit(llm=self.llm, db=self.db)
self.parser = parser
def get_agent(self):
tools = self.__get_tools()
llm_with_tools = self.llm.bind_tools(tools)
prompt = ChatPromptTemplate.from_messages([
("system",SQL_PREFIX.format(dialect=self.sql_toolkit.dialect, top_k=25)),
("user","""The instruction is: {instruction} """),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
agent = (
{"messages":lambda x: x["messages"],
"instruction": lambda x: x["prompt"],
"agent_scratchpad": lambda x: format_to_openai_tool_messages(x.get("intermediate_steps", "")),
"parse_information": lambda x: self.parser.get_format_instructions(),
"current_date": lambda x: datetime.now().strftime("%Y-%m-%d"),
}
| prompt
| llm_with_tools
| OpenAIToolsAgentOutputParser()
)
return AgentExecutor(agent=agent, tools=tools, verbose=True)

2. The Conversation Agent: This agent uses Snowflake Cortex and the Arctic LLM model.

prompt_initial = """
###
You are a Cost Monitoring tool for data warehouses like Snowflake. If the user seems to be asking to get the credits consumed, or the cost of a query, or predicting use of credits, you should
"Looking up data for this".
###
"""
def get_snowflake_arctic_results(messages, sf) -> str:
converted_to_sf_messages = convert_into_snowflake_messages( messages)
pred = sf.call_arctic_complete(converted_to_sf_messages)
return pred

The function get_snowflake_arctic_results triggers the Arctic LLM model; the function that calls the Arctic LLM model is as follows:

def call_arctic_complete(self, messages):
conn = self.get_snowflake_connection()
with conn.cursor() as cur:
sql = "SELECT snowflake.cortex.complete('snowflake-arctic', " + str(messages) +", {}) AS llm_response"

cur.execute(sql)
data = json.loads(cur.fetchall()[0][0])
conn.close()
return data.get("choices", [{}])[0].get("messages", "")‍

3. Chain Creation: We then chain both of these agents to form a graph-like structure, resulting in an intelligent conversational Agent designed to manage cost and credit usage across various warehouses.

Note: We started intending to use Snowflake’s Arctic LLM across the board but soon realized that due to some limitations (e.g., it does not support function calling), we had to incorporate GTP-4 Turbo.

Using the Agents on Streamlit

Check out their official documentation for building a chat interface for the CrystalCosts Agent with Streamlit. It has everything you need to start and make a great conversational app!

Opening Chat Interface of CrystalCosts

Enhancing the AI capabilities

Once you finish the above 2 steps, it’s time to enhance the Agent’s capabilities. Here are some enhancements that we made —

  1. Generating proper responses: Once the data has been fetched by the Agent, it is important to ensure that the responses are consistent and that they are returned in a proper structure. To do this, we used output parsing and pydantic models (check out more about them here).
  2. Adding visuals to the response: To make the responses even more helpful and provide a visual representation for our users, we use Streamlit’s Charts to show the data in line/bar graphs.
  3. Adding forecasting capabilities: We use the Arctic forecasting model to predict warehouse credit usage. Check out the code snippet below to see how it works:

The model can be created in Snowflake using a query command like:

CREATE OR REPLACE SNOWFLAKE.ML.FORECAST {model_name}(
INPUT_DATA => SYSTEM$REFERENCE('VIEW', 'credit_usage_view'),
SERIES_COLNAME => '{warehouse_column}',
TIMESTAMP_COLNAME => '{timestamp_column}',
TARGET_COLNAME => '{value_column}'
)

Then use this below in your code:

def sf_forecast_call(self, timestamp_column, value_column, warehouse_column, warehouse_name, days):
try:
conn = self.get_snowflake_connection(playground=True)
model_name = self.__get_model_name()
with conn.cursor() as cur:
create_model_query = self.__get_model_create_sql(model_name, timestamp_column, value_column, warehouse_column, warehouse_name)
print(create_model_query)
inference_query = f"CALL {model_name}!FORECAST(FORECASTING_PERIODS => {days});"
drop_model = f"DROP SNOWFLAKE.ML.FORECAST {model_name}"
_ = cur.execute(create_model_query)
data = cur.execute(inference_query).fetchall()
_ = cur.execute(drop_model)
return data
except Exception as e:
print("Issue with querying data", e)
finally:
conn.close()

‍Check Snowflake docs for more details on the same: https://docs.snowflake.com/en/user-guide/snowflake-cortex/ml-functions/forecasting

Example of CrystalCosts showing the graph predicting next week’s consumption

Testing and Deployment

We primarily carried out manual testing. The easiest way to deploy the agent is to use the Streamlit cloud. Here are docs you can check out on the same: https://docs.streamlit.io/deploy/streamlit-community-cloud/deploy-your-app

‍Check out our Repo

For the detailed code and full picture, check out our Github Repository here: https://github.com/HousewareHQ/crystal-costs

CrystalCosts Github Repo

Build CrystalCosts with us on Houseware

So, there you have it! CrystalCosts is just the beginning of what’s possible when you combine AI with data management in Snowflake.

As we gear up to launch our new tool at Houseware, we’re excited to see what you all will build next. Houseware is designed to make creating AI agents like CrystalCosts more straightforward and intuitive. Whether you’re looking to streamline cost management, enhance data visualization, or tackle other data-heavy tasks, Houseware provides the platform to make it happen. Dive in, get your hands dirty, and start building your own AI solutions. I can’t wait to see what you come up with!

Special thanks to Shubham Pandey and Priyanshu Shukla from the Houseware team, who developed CrystalCosts.

--

--

Divyansh Saini
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Co-founder & CEO at Houseware. Snowflake Startup Challenge Winner. Snowflake Data Superhero. I love building and breaking AI agents.