Easy RAG Workflows with LlamaIndex Query Pipelines with a Text-to-SQL example

Deltaaruna
Effectz.AI
Published in
12 min readMay 29, 2024

In this article I am discussing the Query Pipeline feature in LlamaIndex. I will discuss what query pipelines are, why they are important and provide a practical example by building a Text to SQL query pipeline.

1. Introduction

Query Pipelines in LlamaIndex help you to easily piece together and reuse RAG components in common workflows and define custom workflows as DAGs(Directed Acyclic Graph). All source codes pertinent to this discussion are available on GitHub. To engage with this content further, please clone the repository and continue with the post.

1.1. RAG Components

Typically a RAG pipeline contains elements for query rewriting, retrieval, reranking, Tools, Response synthesis etc. Many of these workflows are common. Sometimes, you can actually depict them as a DAG.

2. Sequentially Chaining components using a QueryPipeline

Following depicts a simple workflow with a query template and a LLM.

A simple workflow with a query template and a LLM

A simple query pipeline chain for this will be something like following.

A simple query pipeline chain

Let’s implement the above pipeline. First we need to install Llamaindex.

!pip install llama-index==0.10.37

Code for a simple linear query pipeline is as follows.

from llama_index.core.query_pipeline import QueryPipeline
from llama_index.llms.openai import OpenAI
from llama_index.core import PromptTemplate
import openai
openai.api_key = ""
# Chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
# Define the query pipeline
query_pipeline = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)
output = query_pipeline.run(movie_name="The transformers")
print(str(output))

This is the answer for the above query.

You can chain multiple prompts and LLMs as follows.

Chaining multiple prompts and LLMs

2.1. An advanced query pipeline with query rewriting.

Let’s examine an advanced RAG example with the following components.

  • Query rewriting
  • Reranker

With all the components, the sequential query pipeline would look like this.

An advanced query pipeline with query rewriting

Now let’s develop our example. First you need to download data as follows. Data will be stored in the “Data” folder.

!mkdir Data
!wget -O Data/paul_graham_essay.txt https://gist.github.com/wey-gu/75d49362d011a0f0354d39e396404ba2/raw/0844351171751ebb1ce54ea62232bf5e59445bb7/paul_graham_essay.txt

Then we import required modules.

from llama_index.core import (
VectorStoreIndex,
ServiceContext,
SimpleDirectoryReader,
load_index_from_storage,
)

Read the content of the Data folder using SimpleDirectoryReader.

reader = SimpleDirectoryReader("./Data")
docs = reader.load_data()
print(docs[0].get_content())

Then create or rebuild the index.

import os
from llama_index.core.storage import StorageContext
if not os.path.exists("storage"):
index = VectorStoreIndex.from_documents(docs)
# save index to disk
index.set_index_id("vector_index")
index.storage_context.persist("./storage")
else:
# rebuild storage context
storage_context = StorageContext.from_defaults(persist_dir="storage")
# load index
index = load_index_from_storage(storage_context, index_id="vector_index")

2.2 Build the query pipeline

# generate question regarding topic
prompt_str1 = "Please generate a concise question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl1 = PromptTemplate(prompt_str1)
# hallucinate answer.
prompt_str2 = (
"Please write a passage to answer the question\n"
"Try to include as many key details as possible.\n"
"\n"
"\n"
"{query_str}\n"
"\n"
"\n"
'Passage:"""\n'
)
prompt_tmpl2 = PromptTemplate(prompt_str2)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=5)
p = QueryPipeline(
chain=[prompt_tmpl1, llm, prompt_tmpl2, llm, retriever], verbose=True
)

Now you can ask questions from the pipeline as follows.

nodes = p.run(topic="university")
len(nodes)
for node in nodes:
print(node, end="")

This is the answer for the above query.

3. Building a RAG pipeline as a DAG

Sequential retrieval chain looks good right? You can develop a nice RAG pipeline like a train! But assume a scenario like the following.

A RAG pipeline as a DAG

Here some of the modules like retriever, summarizer and re-ranker have more than one input. As you can see, in the real world some of the modules will have more than one input. Then you will have to create a RAG pipeline in the form of a DAG. The arrangement of different RAG components will be similar to the above image. Now let’s learn how to construct such a DAG pipeline.

Earlier we have defined all other modules except re-ranker and the summarizer. So let’s define those modules as well. Here we are using a Cohere re-ranker.

from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize
# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=2)
summarizer = TreeSummarize(
llm=OpenAI(model="gpt-3.5-turbo")
)
reranker = CohereRerank(api_key="")

Now we have defined required modules. Let’s add the module into the query pipeline.

p = QueryPipeline(verbose=True)
p.add_modules(
{
"llm": llm,
"prompt_tmpl": prompt_tmpl,
"retriever": retriever,
"summarizer": summarizer,
"reranker": reranker,
}
)
Once it is done, we need to add the link between them.
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")

Reranker takes two inputs.

  1. From the Retriever side it gets Nodes.
  2. From the LLM side it gets the user query.

Since Reranker has two inputs, you have to specifically specify them as dest_keys.

Just like that, the Summarizer has two inputs.

  1. From the Reranker side it gets re-ranked nodes.
  2. From the LLM side it gets the user queries.

We can visualize it as follows.

A RAG pipeline as a DAG

Now you can ask questions as follows.

nodes = p.run(topic="university")
len(nodes)

This is the answer for the above query.

4. Text to SQL pipeline with DAG

Now let’s discuss text to SQL RAG pipeline using DAG

4.1. Creating the DB

First thing is creating the DB. For the DB you need some tabular data. Let’s download some data from Wikitablequestions. It contains CSV files that can be used to populate the DB

First let’s download data.

!wget "https://github.com/ppasupat/WikiTableQuestions/releases/download/v1.0.2/WikiTableQuestions-1.0.2-compact.zip" -O data.zip
!unzip -o data.zip

Then let’s load all the csv files under the given path into data frames.

import pandas as pd
from pathlib import Path
data_dir = Path("./WikiTableQuestions/csv/200-csv")
csv_files = sorted([f for f in data_dir.glob("*.csv")])
dfs = []
for csv_file in csv_files:
print(f"processing file: {csv_file}")
try:
df = pd.read_csv(csv_file)
dfs.append(df)
except Exception as e:
print(f"Error parsing {csv_file}: {str(e)}")

Then for each data frame, we generate table names and summaries using LLM and save them. For that use the below Pydantic class. The table summary will be useful for the retrievals (more information in the section 4.2).

from llama_index.core.program import LLMTextCompletionProgram
from pydantic import BaseModel, Field
from llama_index.llms.openai import OpenAI
class TableInfo(BaseModel):
"""Information regarding a structured table."""
table_name: str = Field(
…, description="table name (must be underscores and NO spaces)"
)
table_summary: str = Field(
…, description="short, concise summary/caption of the table"
)
prompt_str = """\
Give me a summary of the table with the following JSON format.
- The table name must be unique to the table and describe it while being concise.
- Do NOT output a generic table name (e.g. table, my_table).
Do NOT make the table name one of the following: {exclude_table_name_list}
Table:
{table_str}
Summary: """
program = LLMTextCompletionProgram.from_defaults(
output_cls=TableInfo,
llm=OpenAI(model="gpt-3.5-turbo"),
prompt_template_str=prompt_str,
)

Then for each data frame, we generate the table name and summary of the table using the following code.

import json
def _get_tableinfo_with_index(idx: int) -> str:
results_gen = Path(tableinfo_dir).glob(f"{idx}_*")
results_list = list(results_gen)
if len(results_list) == 0:
return None
elif len(results_list) == 1:
path = results_list[0]
return TableInfo.parse_file(path)
else:
raise ValueError(
f"More than one file matching index: {list(results_gen)}"
)
table_names = set()
table_infos = []
for idx, df in enumerate(dfs):
table_info = _get_tableinfo_with_index(idx)
if table_info:
table_infos.append(table_info)
else:
while True:
df_str = df.head(10).to_csv()
table_info = program(
table_str=df_str,
exclude_table_name_list=str(list(table_names)),
)
table_name = table_info.table_name
print(f"Processed table: {table_name}")
if table_name not in table_names:
table_names.add(table_name)
break
else:
# try again
print(f"Table name {table_name} already exists, trying again.")
pass
out_file = f"{tableinfo_dir}/{idx}_{table_name}.json"
json.dump(table_info.dict(), open(out_file, "w"))
table_infos.append(table_info)

Now you have data frames, table names and summaries. Then we create actual db tables and populate data.

# populate the db
from sqlalchemy import (
create_engine,
MetaData,
Table,
Column,
String,
Integer,
)
import re
# Function to create column names
def sanitize_column_name(col_name):
return re.sub(r"\W+", "_", col_name)
# create a DB table from a data frame
def create_table_from_dataframe(
df: pd.DataFrame, table_name: str, engine, metadata_obj
):
sanitized_columns = {col: sanitize_column_name(col) for col in df.columns}
df = df.rename(columns=sanitized_columns)
# Create columns based on DataFrame columns and data types
columns = [
Column(col, String if dtype == "object" else Integer)
for col, dtype in zip(df.columns, df.dtypes)
]
# Create a table with the defined columns
table = Table(table_name, metadata_obj, *columns)
# Add the table in the DB
metadata_obj.create_all(engine)
# Insert data from DataFrame into the table
with engine.connect() as conn:
for _, row in df.iterrows():
insert_stmt = table.insert().values(**row.to_dict())
conn.execute(insert_stmt)
conn.commit()
engine = create_engine("sqlite:///:memory:")
metadata_obj = MetaData()
for idx, df in enumerate(dfs):
tableinfo = _get_tableinfo_with_index(idx)
print(f"Creating table: {tableinfo.table_name}")
create_table_from_dataframe(df, tableinfo.table_name, engine, metadata_obj)

Now our database is ready, it is time to create the DAG using a query pipeline. We need the following DAG modules.

  1. Object index to store table schemas.
  2. A retriever for retrieving table schemas.
  3. SQLDatabase object to connect to the tables and retriever.
  4. Text-to-SQL Prompt to generate SQL from natural language.
  5. Response synthesis Prompt.
  6. LLM.

Let’s develop it step by step.

4.2 Object index and the retriever

In Retrieval-Augmented Generation (RAG) models, the purpose of a vector index is to enhance the retrieval process by transforming textual data (strings) into numerical representations (vectors) that can be efficiently searched and compared. But here, we are not dealing with strings. We need to store tables schemas that are actually objects, not strings. LlamaIndex provides an easy way to handle such situations. That is object indexes.

So we need an object index so objects can be translated into string representations so they can be vectorised. In retrieval also, retrieved strings representations need to be converted into object representations. Object index helps with that as well. So instead of returning strings, the Object index store will return an object. You can read more about object indexes from this article from Llamaindex documentation. Following is the code

from llama_index.core.objects import (
SQLTableNodeMapping,
ObjectIndex,
SQLTableSchema,
)
from llama_index.core import SQLDatabase

sql_database = SQLDatabase(engine)

table_node_mapping = SQLTableNodeMapping(sql_database)
table_schema_objs = [
SQLTableSchema(table_name=t.table_name, context_str=t.table_summary)
for t in table_infos
] # add a SQLTableSchema for each table

obj_index = ObjectIndex.from_objects(
table_schema_objs,
table_node_mapping,
VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)

What we can save in a vector store is a node. That is somewhat different from a table scheme, which is an object. So we need to have a way of mapping a table schema into a Node. LlamaIndex offers SQLTableNodeMapping to easily do this. Here SQLTableNodeMapping object takes in a SQLDatabase and produces a Node object for each. You can read more in this LlamaIndex article.

But this information is not enough for a good retrieval. Currently we have information about table schemas in Node format. Assume that there are two tables called Student and StudentInfo. We need to differentiate between the two so we know which table to target when a query is constructed. A Really good way of doing this is saving a summary of each table with a reference to the table. If we save the table name and the summary of the table in Nodes, we can easily achieve that. That is exactly what we need to do next. Earlier we extracted table names and summaries into a list called table_infos. Using SQLTableSchema we can create such a Node for each table (using table_infos) and save in the vector store together with the schema.

For further clarity, I printed out table_schema_obj and table_node_mapping._sql_database.

When we retrieve we can fetch the table schemas now. It is done in the code below. Also a FnComponent(function component) has been created in the last line, so this component can be added to a Llamaindex query pipeline.

from llama_index.core.retrievers import SQLRetriever
from typing import List
from llama_index.core.query_pipeline import FnComponent
sql_retriever = SQLRetriever(sql_database)
def get_table_context_str(table_schema_objs: List[SQLTableSchema]):
"""Get table context string."""
context_strs = []
for table_schema_obj in table_schema_objs:
table_info = sql_database.get_single_table_info(
table_schema_obj.table_name
)
if table_schema_obj.context_str:
table_opt_context = " The table description is: "
table_opt_context += table_schema_obj.context_str
table_info += table_opt_context
context_strs.append(table_info)
return "\n\n".join(context_strs)
table_parser_component = FnComponent(fn=get_table_context_str)

4.3. Text-to-SQL Prompt

Once we retrieve the schemas, what we need to do is construct a prompt based on the retrievals. To do that we need to convert the schemas into a string representation. Then we define it as a function component to be used in a query pipeline.

Once returning schemas are converted into the string, we need to create a prompt. We can use the DEFAULT_TEXT_TO_SQL_PROMPT as the base. The FnComponent has been created to add this to the query pipeline.

from llama_index.core.prompts.default_prompts import DEFAULT_TEXT_TO_SQL_PROMPT
from llama_index.core.prompts import PromptTemplate
from llama_index.core.query_pipeline import FnComponent
from llama_index.core.llms import ChatResponse
def parse_response_to_sql(response: ChatResponse) -> str:
"""Parse response to SQL."""
response = response.message.content
sql_query_start = response.find("SQLQuery:")
if sql_query_start != -1:
response = response[sql_query_start:]

if response.startswith("SQLQuery:"):
response = response[len("SQLQuery:") :]
sql_result_start = response.find("SQLResult:")
if sql_result_start != -1:
response = response[:sql_result_start]
return response.strip().strip("```").strip()
sql_parser_component = FnComponent(fn=parse_response_to_sql)
text2sql_prompt = DEFAULT_TEXT_TO_SQL_PROMPT.partial_format(
dialect=engine.dialect.name
)
print(text2sql_prompt.template)

Following is the prompt

Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return the most interesting examples in the database.

Never query for all the columns from a specific table, only ask for a few relevant columns given the question.

Pay attention to use only the column names that you can see in the schema description. Be careful to not query for columns that do not exist. Pay attention to which column is in which table. Also, qualify column names with the table name when needed. You are required to use the following format, each taking one line:

Question: Question here

SQLQuery: SQL Query to run

SQLResult: Result of the SQLQuery

Answer: Final answer here

Only use tables listed below.

{schema}

Question: {query_str}

SQLQuery:

Here {scheme} is the schema converted into text and {query_str} is the natural language question. Following is an example prompt for the question.

4.4. Response Synthesis Prompt

Then we need to generate the response based on the results. A prompt has been created for this purpose and added. The code is as follows.

response_synthesis_prompt_str = (
"Given an input question, synthesize a response from the query results.\n"
"Query: {query_str}\n"
"SQL: {sql_query}\n"
"SQL Response: {context_str}\n"
"Response: "
)
response_synthesis_prompt = PromptTemplate(
response_synthesis_prompt_str,
)

Now all components are ready. Now we need to add the defined modules in the pipeline as follows.

from llama_index.core.query_pipeline import (
QueryPipeline as QP,
Link,
InputComponent,
CustomQueryComponent,
)
qp = QP(
modules={
"input": InputComponent(),
"table_retriever": obj_retriever,
"table_output_parser": table_parser_component,
"text2sql_prompt": text2sql_prompt,
"text2sql_llm": llm,
"sql_output_parser": sql_parser_component,
"sql_retriever": sql_retriever,
"response_synthesis_prompt": response_synthesis_prompt,
"response_synthesis_llm": llm,
},
verbose=True,
)

As discussed we need the initial InputComponent, object retriever to retrieve tables schemas, then convert retrieving schemas into strings and create a prompt for that. Then the llm to answer the prompt. Next we parse the result from the llm and retrieve results from the DB. Next we synthesize the response prompt and specify the response for that prompt.

Then we need to add links as follows.

qp.add_chain(["input", "table_retriever", "table_output_parser"])
qp.add_link("input", "text2sql_prompt", dest_key="query_str")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(
["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"]
)
qp.add_link(
"sql_output_parser", "response_synthesis_prompt", dest_key="sql_query"
)
qp.add_link(
"sql_retriever", "response_synthesis_prompt", dest_key="context_str"
)
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")

The resulting query pipeline looks like the following.

A text to SQL RAG pipeline as a DAG

Now you can ask questions as follows.

response = qp.run(
query="how many schools in Ohio?"
)

Here is the answer.

I think query pipelines offer a scalable and reusable way of building RAG applications.

⭐️ Follow me on LinkedIn or Twitter for updates on AI ⭐️

I’m currently the Co-Founder & CEO @ Effectz.AI. We specialize in Privacy Preserving AI Solutions & AI Consulting.

5. References

  1. https://docs.llamaindex.ai/en/stable/module_guides/querying/pipeline
  2. https://docs.llamaindex.ai/en/stable/examples/pipeline/query_pipeline
  3. https://docs.llamaindex.ai/en/stable/examples/index_structs/struct_indices/SQLIndexDemo/
  4. https://medium.com/llamaindex-blog/introducing-query-pipelines-025dc2bb0537
  5. https://docs.llamaindex.ai/en/stable/examples/objects/object_index

--

--