SQL queries + pgvector: Retrieval Augmented Generation in PostgreSQL

Rubens Zimbres
12 min readNov 20, 2023

The first 7 minutes of this article will show how to use pgvector in a PostgrSQL database to retrieve Top-K results with a SQL query, according to a specific question with the use of embeddings.

Solution 1: use pgvector for retrieval

In this first part, we will create a SQL instance on Google Cloud, then create a PostgreSQL database, and after that, add the content of a Pandas dataset to it. Then, using LangChain, we will create chunks of the description of the products (child toys), overlap them (or not), and, by using pgvector we will populate a table with embeddings of these chunks. Both Table 1 and Table 2 have a common key, Id.

When an user makes a question, this question will become an embedding, and through a SQL query, we will retrieve Top-K most similar results in Table 2 (embeddings), in order to retrieve a list of toys and their description with same Id from Table 1. This is the first part of this tutorial. Simple, through a SQL query.

After that I will guide you on how to stack LangChain after getting these Top-K results in order to add context to an LLM to extract name and price of a toy and generate a clean description of it.

Solution 2: use pgvector for retrieval + LangChain + LLM

Part 1

Let’s start with the basics. Let’s install necessary libraries and define Google Cloud variables:

!pip install asyncio==3.4.3 asyncpg==0.27.0 cloud-sql-python-connector["asyncpg"]==1.2.3
!pip install numpy==1.22.4 pandas==1.5.3
!pip install pgvector==0.1.8
!pip install langchain==0.0.196 transformers==4.30.1
!pip install google-cloud-aiplatform==1.26.0
project_id = "your-project"  # @param {type:"string"}
database_password = "password1234" # @param {type:"string"}
region = "us-west2" # @param {type:"string"}
instance_name = "pgvector-demo" # @param {type:"string"}
database_name = "retail" # @param {type:"string"}
database_user = "retail-admin" # @param {type:"string"}

Now we will set the Google Cloud project, add a IAM policy for this user (cloudsql.client)in the project and enable aiplatform and sql services:

!gcloud config set project {project_id}

# Grant Cloud SQL Client role to authenticated user
current_user = !gcloud auth list --filter=status:ACTIVE --format="value(account)"

!gcloud projects add-iam-policy-binding {project_id} \
--member=user:{current_user[0]} \
--role="roles/cloudsql.client"


# Enable Cloud SQL Admin API
!gcloud services enable sqladmin.googleapis.com
!gcloud services enable aiplatform.googleapis.com

Now we will create the SQL instance, define the database version (POSTGRES_15), create a PostgreSQL database and an user:

database_version = !gcloud sql instances describe {instance_name} --format="value(databaseVersion)"

!gcloud sql instances create {instance_name} --database-version=POSTGRES_15 \
--region={region} --cpu=1 --memory=4GB --root-password={database_password}

# Create the database, if it does not exist.
out = !gcloud sql databases list --instance={instance_name} --filter="NAME:{database_name}" --format="value(NAME)"
!gcloud sql databases create {database_name} --instance={instance_name}

# Create the database user for accessing the database.
!gcloud sql users create {database_user} \
--instance={instance_name} \
--password={database_password}

Now we will connect to this PostgreSQL database. This is critical if you are doing things locally, because we will use a environment variable with the Google Application Credentials. So, you must know exactly which service account will handle the SQL database, provide it with a IAM role of sql.client and generate an updated key.json file.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/home/user/key.json"

Now, let’s ping the database and check if everything was done properly:

async def main():
# get current running event loop to be used with Connector
loop = asyncio.get_running_loop()
# initialize Connector object as async context manager
async with Connector(loop=loop) as connector:
# createpgvector connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
f"{project_id}:{region}:{instance_name}", # Cloud SQL instance connection name
"asyncpg",
user=f"{database_user}",
password=f"{database_password}",
db=f"{database_name}"
)

# query Cloud SQL database
results = await conn.fetch("SELECT version()")
print(results[0]["version"])

# close asyncpg connection
await conn.close()


# Test connection with `asyncio`
await main() # type: ignore

You will get the database version, PostgreSQL 15.4 on x86_64-pc-linux-gnu. Ok, now let’s populate this database with part of a Kaggle retail dataset of toys:

import pandas as pd
import os

DATASET_URL = "https://github.com/GoogleCloudPlatform/python-docs-samples/raw/main/cloud-sql/postgres/pgvector/data/retail_toy_dataset.csv"
df = pd.read_csv(DATASET_URL)
df = df.loc[:, ["product_id", "product_name", "description", "list_price"]]
df = df.dropna()
df.head(10)
Toy dataset

You have the product_id that will be the same key in the other table (embeddings), product name, description (that will become embeddings) and list price. Let’s see a Product name:

“Koplow Games Set of 2 D12 12-Sided Rock, Paper, Scissors Game Dice — White with Pink Letters #13060”

And its description:

“Rock, paper, scissors is a great way to resolve disputs and hard decisions. Now you can battle it out with these 28mm 12-sided rock, paper, scissors dice. Each word is written on 4 sides of each die. The words are etched into the surface and painted pink. On a roll, the side facing up is the word used on that turn. Rock beats scissors, scissors beats paper, paper beats rock. The 2 dice come enclosed in a convenient plastic re-sealable reusable bag. Great for educational games, dice games, board games, creating your own game, and friends and family fun! Koplow Games Dice are some of the best in the hobby, with well painted and etched numbers/pips/text. WARNING: CHOKING HAZARD — Small parts. Not for children under 3 years. Koplow Games Set of 2 D12 12-Sided Rock, Paper, Scissors Game Dice — White with Pink Letters #13060”

Now we will save this Pandas table in the PostgreSQL database table 1. We create the table products and add data:

from google.cloud.sql.connector import Connector


async def main():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
# Create connection to Cloud SQL database
conn: asyncpg.Connection = await connector.connect_async(
f"{project_id}:{region}:{instance_name}", # Cloud SQL instance connection name
"asyncpg",
user=f"{database_user}",
password=f"{database_password}",
db=f"{database_name}",
)

await conn.execute("DROP TABLE IF EXISTS products CASCADE")
# Create the `products` table.
await conn.execute(
"""CREATE TABLE products(
product_id VARCHAR(1024) PRIMARY KEY,
product_name TEXT,
description TEXT,
list_price NUMERIC)"""
)

# Copy the dataframe to the `products` table.
tuples = list(df.itertuples(index=False))
await conn.copy_records_to_table(
"products", records=tuples, columns=list(df), timeout=10
)
await conn.close()


# Run the SQL commands now.
await main()

Perfect, PostgreSQL Table 1 is ready. Now we will create embeddings of the product description and add them to Table 2. First, we will use LangChain to create chunks of description text, overlapping them (or not). Then, we will create the embeddings for each chunk using LangChain VertexAIEmbeddings.

# Split long text descriptions into smaller chunks that can fit into
# the API request size limit.

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
separators=[".", "\n"],
chunk_size=500,
chunk_overlap=80,
length_function=len,
)
chunked = []
for index, row in df.iterrows():
product_id = row["product_id"]
desc = row["description"]
splits = text_splitter.create_documents([desc])
for s in splits:
r = {"product_id": product_id, "content": s.page_content}
chunked.append(r)
from langchain.embeddings import VertexAIEmbeddings
from google.cloud import aiplatform
import time

aiplatform.init(project=f"{project_id}", location=f"{region}")
embeddings_service = VertexAIEmbeddings()

def retry_with_backoff(func, *args, retry_delay=5, backoff_factor=2, **kwargs):
max_attempts = 10
retries = 0
for i in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"error: {e}")
retries += 1
wait = retry_delay * (backoff_factor**retries)
print(f"Retry after waiting for {wait} seconds...")
time.sleep(wait)


batch_size = 5
for i in range(0, len(chunked), batch_size):
request = [x["content"] for x in chunked[i : i + batch_size]]
response = retry_with_backoff(embeddings_service.embed_documents, request)
# Store the retrieved vector embeddings for each chunk back.
for x, e in zip(chunked[i : i + batch_size], response):
x["embedding"] = e

# Store the generated embeddings in a pandas dataframe.
product_embeddings = pd.DataFrame(chunked)
product_embeddings.head()

Here is the content that will be transferred to Table 2:

Embeddings of product description

We now use pgvector to store the generated embeddings within PostgreSQL Table 2, called product_embeddings:

import numpy as np
from pgvector.asyncpg import register_vector


async def main():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
# Create connection to Cloud SQL database.
conn: asyncpg.Connection = await connector.connect_async(
f"{project_id}:{region}:{instance_name}", # Cloud SQL instance connection name
"asyncpg",
user=f"{database_user}",
password=f"{database_password}",
db=f"{database_name}",
)

await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")
await register_vector(conn)

await conn.execute("DROP TABLE IF EXISTS product_embeddings")
# Create the `product_embeddings` table to store vector embeddings.
await conn.execute(
"""CREATE TABLE product_embeddings(
product_id VARCHAR(1024) NOT NULL REFERENCES products(product_id),
content TEXT,
embedding vector(768))"""
)

# Store all the generated embeddings back into the database.
for index, row in product_embeddings.iterrows():
await conn.execute(
"INSERT INTO product_embeddings (product_id, content, embedding) VALUES ($1, $2, $3)",
row["product_id"],
row["content"],
np.array(row["embedding"]),
)

await conn.close()


# Run the SQL commands now.
await main()

Excellent. So far, basic SQL and Python. Let’s try the solution with the following prompt:

toy = "play card games"  # @param {type:"string"}
min_price = 20 # @param {type:"integer"}
max_price = 200 # @param {type:"integer"}
from google.cloud import aiplatform

aiplatform.init(project=f"{project_id}", location=f"{region}")

embeddings_service = VertexAIEmbeddings()
qe = embeddings_service.embed_query([toy])

We will select best 50 matches within a defined cosine similarity threshold that satisfy our query. The <=> operator in SQL syntax is the null-safe equality comparison operator. It is equivalent to the standard SQL IS NOT DISTINCT FROM operator. It behaves like the equals operator (=) but returns 1 rather than NULL if both operands are NULL, and 0 rather than NULL if one operand is NULL. This can be useful for comparing columns that may contain NULL values.

matches = []


async def main():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
# Create connection to Cloud SQL database.
conn: asyncpg.Connection = await connector.connect_async(
f"{project_id}:{region}:{instance_name}", # Cloud SQL instance connection name
"asyncpg",
user=f"{database_user}",
password=f"{database_password}",
db=f"{database_name}",
)

await register_vector(conn)
similarity_threshold = 0.2
num_matches = 50

# Find similar products to the query using cosine similarity search
# over all vector embeddings. This new feature is provided by `pgvector`.
results = await conn.fetch(
"""
WITH vector_matches AS (
SELECT product_id, 1 - (embedding <=> $1) AS similarity
FROM product_embeddings
WHERE 1 - (embedding <=> $1) > $2
ORDER BY similarity DESC
LIMIT $3
)
SELECT product_name, list_price, description FROM products
WHERE product_id IN (SELECT product_id FROM vector_matches)
AND list_price >= $4 AND list_price <= $5
""",
qe,
similarity_threshold,
num_matches,
min_price,
max_price,
)

if len(results) == 0:
raise Exception("Did not find any results. Adjust the query parameters.")

for r in results:
# Collect the description for all the matched similar toy products.
matches.append(
{
"product_name": r["product_name"],
"description": r["description"],
"list_price": round(r["list_price"], 2),
}
)

await conn.close()


# Run the SQL commands now.
await main() # type: ignore

# Show the results for similar products that matched the user query.
matches = pd.DataFrame(matches)
matches.head(5)

We will get a dataframe of best results:

This is the first part of the article. With a simple SQL query and pgvector, you can make information retrieval in a PostgreSQL database inside a Google Cloud environment, using LangChain and aiplatform. Note that at this point, you can also order the list according to cosine distance, and after this step, filter results according to business use.

Part 2

In this part of the article, I will show how to build a product search, whose results will be analyzed and summarized by LLM given an additional context.

We will work with this prompt:

user_query = "Do you have a beach toy set that teaches numbers and letters to kids?"  # @param {type:"string"}
min_price = 20 # @param {type:"integer"}
max_price = 120 # @param {type:"integer"}
qe = embeddings_service.embed_query([user_query])
matches = []

async def main():
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
# Create connection to Cloud SQL database.
conn: asyncpg.Connection = await connector.connect_async(
f"{project_id}:{region}:{instance_name}", # Cloud SQL instance connection name
"asyncpg",
user=f"{database_user}",
password=f"{database_password}",
db=f"{database_name}",
)

await register_vector(conn)
similarity_threshold = 0.7
num_matches = 5

# Find similar products to the query using cosine similarity search
# over all vector embeddings. This new feature is provided by `pgvector`.
results = await conn.fetch(
"""
WITH vector_matches AS (
SELECT product_id, 1 - (embedding <=> $1) AS similarity
FROM product_embeddings
WHERE 1 - (embedding <=> $1) > $2
ORDER BY similarity DESC
LIMIT $3
)
SELECT product_name, list_price, description FROM products
WHERE product_id IN (SELECT product_id FROM vector_matches)
AND list_price >= $4 AND list_price <= $5
""",
qe,
similarity_threshold,
num_matches,
min_price,
max_price,
)

if len(results) == 0:
raise Exception("Did not find any results. Adjust the query parameters.")

for r in results:
# Collect the description for all the matched similar toy products.
matches.append(
f"""The name of the toy is {r["product_name"]}.
The price of the toy is ${round(r["list_price"], 2)}.
Its description is below:
{r["description"]}."""
)
await conn.close()


# Run the SQL commands now.
await main()

# Show the results for similar products that matched the user query.
matches

Result:

[‘The name of the toy is Kids Beach Sand Toys Set for Gift with Sand Molds,Mesh Bag, Sand Wheel,Tool Play Set, Watering Can, Shovels, Rakes, Bucket ,Sea Creatures, Castle Molds 18 PCs F-129.\n The price of the toy is $22.99.\n Its description is below:\n Educational: Because sand is an excellent tool for enhancing kids sensory experiences, adding this beach sand toy set will augment the fun and the learning experience. The opportunity to create and build with all the molds and tools is what gets the imagination moving. […] Perfect Gift: Be it a sunny day at the beach, a sandbox at the park or a sandbox in your backyard, this beach set is ideal for any of these occasions. For this reason, this gift is not limited to any occasion, holiday or birthday present! It will particularly thrill kids who love the ocean, the castles and sea animals. However, it could be that your kid has yet to discover that love for the sand until the opportunity is given. Includes: a bucket, water wheel, water can, 2 rakes (1 big and 1 small), 3 shovels (2 small and 1 big), drain shovel, and 9 molds (seahorse, starfish, fish, duck, turtle, whale, shell, crab, castle).[…]Because the beach sand toy set does not include small pieces, there is no choking hazard, so anyone can join in the fun. There are also no sharp edges or parts in any of the pieces”

It does not exactly teaches kids numbers and letters, but we are very close, given that we are working with an extremely small dataset. For better results, we can use the whole Toy Kaggle Dataset, but it will take a longer time and resources to generate embeddings for all hundreds of thousands of product. That’s not my idea here.

We will now use LangChain to summarize this big output (matches), extracting the name of the toy, its features and its price.

from langchain.chains.summarize import load_summarize_chain
from langchain.docstore.document import Document
from langchain.llms import VertexAI
from langchain import PromptTemplate, LLMChain
from IPython.display import display, Markdown

llm = VertexAI()

map_prompt_template = """
You will be given a detailed description of a toy product.
This description is enclosed in triple backticks (```).
Using this description only, extract the name of the toy,
the price of the toy and its features.

```{text}```
SUMMARY:
"""
map_prompt = PromptTemplate(template=map_prompt_template, input_variables=["text"])

Then, we will use the matches result from the SQL query as a context for the LLM to answer, given the user query. By using this, we will create a high-quality prompt instruction for the LLM with additional context.

Note that is my other article, Code Generation using Retrieval Augmented Generation + LangChain, RAG is done by using LangChain to search for Top-K most similar matches according to embeddings of the question and embeddings of the scraped GitHub code, in a FAISS index. Here, this task is done by a SQL query. In the aforementioned article, the LLM is pretrained on code generation. Here, we will use the product’s description as an instruction.

combine_prompt_template = """
You will be given a detailed description different toy products
enclosed in triple backticks (```) and a question enclosed in
double backticks(``).
Select one toy that is most relevant to answer the question.
Using that selected toy description, answer the following
question in as much detail as possible.
You should only use the information in the description.
Your answer should include the name of the toy, the price of the toy
and its features. Your answer should be less than 200 words.
Your answer should be in Markdown in a numbered list format.


Description:
```{text}```


Question:
``{user_query}``


Answer:
"""
combine_prompt = PromptTemplate(
template=combine_prompt_template, input_variables=["text", "user_query"]
)

docs = [Document(page_content=t) for t in matches]
chain = load_summarize_chain(
llm, chain_type="map_reduce", map_prompt=map_prompt, combine_prompt=combine_prompt
)
answer = chain.run(
{
"input_documents": docs,
"user_query": user_query,
}
)


display(Markdown(answer))

The output is already formatted and will bring the name of the toy in item 1 and its summarized description in item 2.

  1. Kids Beach Sand Toys Set for Gift with Sand Molds,Mesh Bag, Sand Wheel,Tool Play Set, Watering Can, Shovels, Rakes, Bucket ,Sea Creatures, Castle Molds 18 PCs F-129 is a great beach toy set that teaches numbers and letters to kids.
  2. It includes 18 pieces, including a bucket, water wheel, water can, 2 rakes (1 big and 1 small), 3 shovels (2 small and 1 big), drain shovel, and 9 molds (seahorse, starfish, fish, duck

This is another way to do the RAG task, when you are using a PostgreSQL database in Google Cloud.

*Google ML Developer Programs team supported this work by providing Google Cloud Credits

--

--

Rubens Zimbres

I’m a Senior Data Scientist and Google Developer Expert in ML and GCP. I love studying NLP algos and Cloud Infra. CompTIA Security +. PhD. www.rubenszimbres.phd