Leveraging FastAPI, OpenAI, and SQLAlchemy for Natural Language SQL Queries
SQL (Structured Query Language) is the standard language for managing and manipulating relational databases. What if we could interact with databases using natural language queries?
In this post we show how you can use SQL to load a dataframe to a database, write a prompt to query it, and connect this to a FastAPI application for deployment and enabling users to interact with the database.
Prerequisites
Ensure you have the following.
- Python 3.6+
- Virtual environment setup
- FastAPI
- Pydantic
- OpenAI API Key
- SQLAlchemy
- pandas
- uvicorn
- python-dotenv
This post assumes you save your OpenAI key on an environment variable (for example a .env
file) with the name OPENAI_API_KEY.
Import libraries
import os
from dotenv import load_dotenv
from pathlib import Path
import pandas as pd
from fastapi import FastAPI
from pydantic import BaseModel
import openai
from sqlalchemy.engine import create_engine
Custom classes and functions
The following class was built to help parse the content. I used a lower temperature to reduce “creativity”. One issue I faced is the query was not generated consistently when given the same prompt. While temperature=0.2 will not resolve this issue, it reduced the chances of producing off responses.
class Prompter:
def __init__(self, api_key, gpt_model):
if not api_key:
raise Exception("Please provide the OpenAI API key")
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.gpt_model = gpt_model
def prompt_model_return(self, messages: list):
response = openai.ChatCompletion.create(model=self.gpt_model,
messages=messages,
temperature=0.2)
return response["choices"][0]["message"]["content"]
Assume you have a dataframe object that reads a file called df.
Using SQLite (you can create engines from through various strings See this post), we can then dump the content of the dataframe into the database instance.
def init_database(df):
# Set up engine
engine = create_engine("sqlite://")
# Populate database
df.to_sql("dataDB", engine)
return engine
We can initiate a ‘naive approach’ that assumes a clean dataframe (and thus a database with one table).
df
assumes a few important things: column names must not contain spaces nor special characters, and by this stage, you have performed data cleaning, feature engineering, etc.
def init_data():
# Set the path to the raw data
# Convert the current working directory to a Path object
script_dir = Path(os.getcwd())
data_path = script_dir / 'data' /'my_data.csv'
# Load the CSV file into a DataFrame
df = pd.read_csv(data_path)
sample_values = {df.columns[i]: df.values[0][i] \
for i in range(len(df.columns))}
return df, sample_values
Using Pydantic’s BaseModel, we can then define queries as follows:
class Query(BaseModel):
query: str
Setting up the API with FastAPI
Initialize the app.
app = FastAPI()
The root endpoint (“/”) is a GET request that returns a welcome message.
@app.get("/")
async def root():
return {"message": "Welcome! Please send a POST request to /search with a JSON body containing a 'query' key and a natural language query as the value. Visit /docs for more "}
On application startup, the .env file is loaded, the OpenAI API key is fetched, the Prompter instance is initialized, and the database is set up.
@app.on_event("startup")
async def startup_event():
# Load the .env file
load_dotenv(".env")
# Get OpenAI API key
openai_api_key = os.environ.get("OPENAI_API_KEY")
# Initialize prompter
global prompter
prompter = Prompter(openai_api_key, "gpt-3.5-turbo")
# Initialize data
global df
global sample_values
df, sample_values = init_data()
# Set up engine
global engine
engine = init_database(df)
Here is where prompting happens. We used the following roles and contents and ensured the prompt had access to the table schema and one entry in the table.
system_content = "You are a data analyst specializing in SQL, \
you are presented with a natural language query, \
and you form queries to answer questions about the data."
user_content = f"Please generate 1 SQL queries for data with \
columns {', '.join(df.columns)} \
and sample values {sample_values}. \
The table is called 'dataDB'. \
Use the natural language query {query.query}"
datagen_prompts = [
{"role" : "system", "content" : system_content},
{"role" : "user", "content" : user_content},
]
We can then get the prompt to generate a query.
# Take parameters and form a SQL query
result = prompter.prompt_model_return(datagen_prompts)
# Sometimes the query is verbose - adding unnecessary explanations
sql_query = result.split("\n\n")[0]
Wrap it into a function:
def init_prompt(query: Query):
# Generate SQL query
system_content = "You are a data analyst specializing in SQL, \
you are presented with a natural language query, \
and you form queries to answer questions about the data."
user_content = f"Please generate 1 SQL queries for data with \
columns {', '.join(df.columns)} \
and sample values {sample_values}. \
The table is called 'dataDB'. \
Use the natural language query {query.query}"
datagen_prompts = [
{"role" : "system", "content" : system_content},
{"role" : "user", "content" : user_content},
]
# Take parameters and form a SQL query
sql_result= prompter.prompt_model_return(datagen_prompts)
# Sometimes the query is verbose - adding unnecessary explanations
sql_query = sql_result.split("\n\n")[0]
return sql_query
The “/search” endpoint accepts POST requests with a Pydantic model ‘Query’ and returns the result of the SQL query on the database.
@app.post("/search")
async def search(query: Query):
# Initialize prompt
sql_query = init_prompt(query)
try:
# Execute SQL query and fetch results
with engine.connect() as connection:
result = connection.execute(sql_query)
rows = result.fetchall()
# Convert rows to list of dicts for JSON response
columns = result.keys()
data = [dict(zip(columns, row)) for row in rows]
return {"data": data}
except Exception as e:
return {"error": f"SQL query failed. {e}"}
Deploying API locally. Once this step is complete, and assuming your application’s name is app.py
you can go make queries against it via the command
uvicorn app:app
This will execute your API, which you can visit via the url http://127.0.0.1:8000/.
To explore making natural language queries, go to http://127.0.0.1:8000/docs.
Bringing all pieces together
import os
from dotenv import load_dotenv
from pathlib import Path
import pandas as pd
from fastapi import FastAPI
from pydantic import BaseModel
import openai
from sqlalchemy.engine import create_engine
class Prompter:
def __init__(self, api_key, gpt_model):
if not api_key:
raise Exception("Please provide the OpenAI API key")
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.gpt_model = gpt_model
def prompt_model_return(self, messages: list):
response = openai.ChatCompletion.create(model=self.gpt_model,
messages=messages,
temperature=0.2)
return response["choices"][0]["message"]["content"]
def init_database(df):
# Set up engine
engine = create_engine("sqlite://")
# Populate database
df.to_sql("dataDB", engine)
return engine
def init_data():
# Set the path to the raw data
# Convert the current working directory to a Path object
script_dir = Path(os.getcwd())
data_path = script_dir / 'data' /'my_data.csv'
# Load the CSV file into a DataFrame
df = pd.read_csv(data_path)
sample_values = {df.columns[i]: df.values[0][i] \
for i in range(len(df.columns))}
return df, sample_values
class Query(BaseModel):
# Pydantic model
query: str
def init_prompt(query: Query):
# Generate SQL query
system_content = "You are a data analyst specializing in SQL, \
you are presented with a natural language query, \
and you form queries to answer questions about the data."
user_content = f"Please generate 1 SQL queries for data with \
columns {', '.join(df.columns)} \
and sample values {sample_values}. \
The table is called 'dataDB'. \
Use the natural language query {query.query}"
datagen_prompts = [
{"role" : "system", "content" : system_content},
{"role" : "user", "content" : user_content},
]
# Take parameters and form a SQL query
sql_result= prompter.prompt_model_return(datagen_prompts)
# Sometimes the query is verbose - adding unnecessary explanations
sql_query = sql_result.split("\n\n")[0]
return sql_query
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Welcome! Please send a POST request \
to /search with a JSON body containing a 'query' \
key and a natural language query as the value. \
Visit /docs for more "}
@app.on_event("startup")
async def startup_event():
# Load the .env file
load_dotenv(".env")
# Get OpenAI API key
openai_api_key = os.environ.get("OPENAI_API_KEY")
# Initialize prompter
global prompter
prompter = Prompter(openai_api_key, "gpt-3.5-turbo")
# Initialize data
global df
global sample_values
df, sample_values = init_data()
# Set up engine
global engine
engine = init_database(df)
@app.post("/search")
async def search(query: Query):
# Initialize prompt
sql_query = init_prompt(query)
try:
# Execute SQL query and fetch results
with engine.connect() as connection:
result = connection.execute(sql_query)
rows = result.fetchall()
# Convert rows to list of dicts for JSON response
columns = result.keys()
data = [dict(zip(columns, row)) for row in rows]
return {"data": data}
except Exception as e:
return {"error": f"SQL query failed. {e}"}
Summary
In this blog we learned how to combine setting up a SQLite DB, loading it with data, then generating a prompt that translates natural language to queries. Through FastAPI we explored one solution for creating an interface that users can interact with.