Building and Scaling a GenAI Application with Python and Choreo
The rapid rise of Generative AI (GenAI) has transformed industries by enabling applications that generate content, deliver predictive insights, and enhance user experiences. However, building production-grade GenAI applications can be challenging without the right tools and infrastructure.
In this blog, we’ll explore how to develop a scalable GenAI application using Python and deploy it on Choreo, a powerful platform that simplifies the process of deploying and scaling applications.
Understanding the Scenario
Imagine you need to build a chat agent capable of answering questions by referencing PDF documents. This agent should continuously learn and provide a personalized experience for each user. Users will upload PDFs to educate the agent and then ask questions based on the content of those documents. The service should allow users to:
- Upload, process, and store PDF documents.
- Ask questions about the uploaded documents.
This problem aligns with Retrieval Augmented Generation (RAG), where a vector database is used to index information extracted from the uploaded PDFs. Prompts to the large language models (LLMs) are then enriched by fetching the most relevant pieces of information from the vector database to answer the user’s questions.
Although this may seem straightforward, this type of application presents significant challenges, especially when scaling to handle multiple users concurrently. This is where Python, with its rich set of libraries, and Choreo, with its seamless scaling capabilities, come into play.
Why Python?
Python has become the leading language for GenAI application development due to its extensive libraries like Langchain and LlamaIndex, tailored for AI-driven tasks. Its simplicity, robust community support, and proven ability to build scalable applications make it ideal for rapid development. Additionally, Python offers faster development cycles and lower costs compared to other languages, solidifying its position as the top choice for GenAI projects.
Building Scalable a Chat Agent Service
Given that our service must handle multiple concurrent requests, we must ensure it can scale effectively. Python’s Global Interpreter Lock (GIL) limits its ability to serve multiple requests concurrently. Additionally, since GenAI applications often connect to various APIs (like LLM APIs and vector databases), they are typically I/O-bound, making asynchronous programming crucial.
To maximize performance, we’ll use FastAPI for the backend, leveraging its asynchronous capabilities to handle I/O-bound operations efficiently. By using Langchain’s async operations, we can prevent our Python service from being blocked while waiting for I/O, ensuring better resource utilization.
Here’s how the service is structured to support these asynchronous operations:
from typing import List, Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from pydantic import BaseModel
from starlette.middleware.cors import CORSMiddleware
# FastAPI app initialization
app = FastAPI()
# CORS middleware configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize the vector store
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = PineconeVectorStore(embedding=embeddings)
# Initialize OpenAI language model
llm = ChatOpenAI(model_name="gpt-4o-mini")
# Pydantic models for request validation
class Message(BaseModel):
role: str
content: Optional[str] = ""
class ConversationRequest(BaseModel):
user_id: str
message: str
chat_history: List[Message]
@app.post("/upload_pdf")
async def upload_pdf(file: UploadFile = File(...), user_id: str = Form(...)):
# Process and store the PDF
# We'll implement this function later
@app.post("/ask_question")
async def ask_question(request: ConversationRequest):
# Answer the user's question
# We'll implement this function later
The above code sets up the FastAPI application with necessary middleware. It initializes the required AI components, including OpenAI embeddings, the vector store, and the language model.
Upload PDFs to Build the Knowledge Base
The following code snippet demonstrates how the service allows uploading PDFs, processing them, splitting the content into manageable chunks, and storing it in a vector store — all while leveraging the asynchronous capabilities of FastAPI and LangChain to enhance scalability.
import tempfile
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
@app.post("/upload_pdf")
async def upload_pdf(file: UploadFile = File(…), user_id: str = Form(…)):
try:
# Store the uploaded PDF in a temporary file.
with tempfile.NamedTemporaryFile() as temp_file:
temp_file.write(await file.read())
temp_file_path = temp_file.name
# Load and extract text from the PDF.
loader = PyPDFLoader(temp_file_path)
documents = await loader.aload()
# Split the extracted text into manageable chunks.
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)
# Associate each chunk with the user by adding user_id to the metadata.
for chunk in chunks:
chunk.metadata["user_id"] = user_id
# Store the processed chunks in the vector database.
vector_store.add_documents(chunks)
return {"message": "PDF processed and stored successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Data Uploading and Parsing: The service asynchronously handles PDF uploads using FastAPI’s UploadFile
, followed by parsing with LangChain's PyPDFLoader
. By leveraging non-blocking I/O, it efficiently manages file reading and PDF parsing, allowing the service to process multiple requests concurrently without I/O bottlenecks. Although the temporary file creation is synchronous for simplicity, it can be further optimized.
Chunking and Tagging: The extracted text is split into manageable chunks, each tagged with the user_id
to enable personalized responses later.
Database Update: The tagged chunks are then asynchronously added to a vector store, ensuring the service remains responsive, even when processing large data uploads.
Answering Questions using PDF Knowledge base
The following code snippet demonstrates how the service handles user queries by leveraging the PDFs uploaded by each user, while ensuring high scalability through asynchronous processing.
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# Instructions for the LLM to reformulate the latest user question using chat history.
contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""
# Instructions for the LLM to generate an answer to the user question using the provided context.
qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.
{context}"""
@app.post("/ask_question")
async def ask_question(request: ConversationRequest):
try:
# Extract user ID and the current message from the request.
user_id = request.user_id
message = request.message
# Convert chat history to a list of tuples containing roles and message content.
chat_history = [(msg.role, msg.content) for msg in request.chat_history]
# Initialize a retriever from the vector store.
# Filtered by user ID and limiting to 5 results.
retriever = vector_store.as_retriever(
search_kwargs={"filter": {"user_id": user_id}, "k": 5}
)
# Create a prompt template for the history aware retriever
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system", contextualize_q_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
# Create a retriever that uses question and chat history to retrieve context
history_aware_retriever = create_history_aware_retriever(
llm, retriever, contextualize_q_prompt
)
# Create a prompt template for the answer generation
qa_prompt = ChatPromptTemplate.from_messages([
("system", qa_system_prompt),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
# Create a chain that processes retrieved documents to generate an answer.
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
# Create a RAG chain that combines history-aware retrieval and answer generation.
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
# Invoke the RAG chain with the user's input and chat history to get the response.
response = await rag_chain.ainvoke({
"input": message,
"chat_history": chat_history
})
return {"response": response['answer']}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Request Handling: When a user sends a POST request to the /ask_question
endpoint, it receives the user_id
, message
, and chat_history
as input. Here, the chat history is maintained by the client; alternatively, it can also be stored in the backend for each user.
Document Retrieval: A retriever is initialized using the vector store, filtering documents by user_id
to ensure that only relevant documents are considered. This ensures personalized responses based on the user’s previously uploaded PDFs.
History-Aware Retriever: A contextualization prompt (contextualize_q_system_prompt
) is used here to reformulate the user's question by integrating the full chat history. LangChain's create_history_aware_retriever()
extends the capabilities of the vector store retriever, enabling it to fetch documents that are most relevant to the entire conversation context, rather than focusing solely on the latest message.
Question-Answering Chain: The qa_system_prompt
guides the LLMs in generating responses reffering to the provided documents as the context. The create_stuff_documents_chain()
function is used to set up this chain.
RAG Chain: The history-aware retriever and the question-answering chain are combined into a single RAG chain using the create_retrieval_chain()
function, ensuring seamless integration from document retrieval to response generation.
Asynchronous Execution: The RAG chain is executed asynchronously using rag_chain.ainvoke()
. This approach prevents blocking during the invocation of the vector store and multiple LLMs calls, keeping the service highly responsive and capable of handling multiple requests concurrently.
Response Generation: Returns the answer generated by LLMs during the RAG.
This Chat Agent service not only delivers accurate and context-aware responses but also enhances performance and scalability through the implementation of asynchronous programming. Subsequently, when integrated with Choreo’s scalability features, it lays the foundation for a highly scalable GenAI application.
Link to the full code: Chat Agent Service
Fun fact: Almost all the code in the above repository is generated using GenAI models, including the front-end.
Choreo for Deploying and Scaling GenAI Applications
Deploying Your GenAI Application on Choreo
Before diving into scaling strategies, the first step is to deploy your GenAI application on a reliable platform. Choreo makes this process straightforward with its robust deployment tools.
To begin, ensure your GenAI application is developed using asynchronous programming practices, as GenAI applications are typically I/O-bound. This approach prevents your application from being blocked by the Global Interpreter Lock (GIL) during concurrent execution.
Deploying your GenAI Python service on Choreo is seamless with the use of build-packs. The deployment process requires you to define a Procfile, which specifies the command that runs your application. This Procfile is essential because it enables you to configure your application to use process managers effectively — a crucial step for scaling your application later on.
For detailed instructions on setting up and deploying your Python-based GenAI application on Choreo, refer to the “Crafting Production-Ready GenAI: A Step-by-Step Guide with LangChain and Choreo”.
Scaling Your GenAI Application with Choreo
Once your application is deployed, the next crucial step is scaling to meet varying demand levels. Choreo offers a flexible platform that supports two primary scaling strategies: process manager-level scaling and hardware-level scaling.
Process Manager-Level Scaling
In production environments, using application servers like Uvicorn, Gunicorn, or Daphne is essential for achieving robust and scalable deployments. These servers support protocols such as ASGI for FastAPI and WSGI for Flask/Django, which are critical for handling concurrent requests efficiently. Moreover, these servers can also function as process managers, distributing the workload across multiple worker processes to maximize CPU utilization.
A common approach is to deploy with 2n + 1
workers, where n
is the number of CPU cores. This configuration ensures efficient load distribution among workers, enabling your application to handle more requests simultaneously. However, this method requires pre-defining the number of workers based on available CPU resources, which may not be ideal for applications with highly fluctuating loads—leading to potential underutilization of CPU resources during periods of low demand.
With Choreo, you can easily configure your application’s process manager settings through a Procfile.
Procfile Configuration for Synchronous Applications: For synchronous applications, like those built with Flask or Django, the following Procfile configuration is used:
web: gunicorn -w <number_of_workers> -b <bind_address>:<port> <module_name>:<app_instance>
<number_of_workers>
: The number of worker processes (e.g., 3). More workers can handle more requests simultaneously, increasing the scalability of your application.<bind_address>
: The IP address to bind the server to (e.g.,0.0.0.0
to listen on all available interfaces).<port>
: The port number on which your application will run (e.g.,9090
).<module_name>
: The Python module that contains your WSGI application (e.g.,app.main
).<app_instance>
: The WSGI application instance within the module (e.g.,app
).
Procfile Configuration for Asynchronous Applications: For asynchronous applications, such as those built with FastAPI, you need a slightly different configuration to take full advantage of non-blocking I/O operations:
web: gunicorn -w <number_of_workers> -b <bind_address>:<port> -k uvicorn.workers.UvicornWorker <module_name>:<app_instance>
-k uvicorn.workers.UvicornWorker
: The-k
flag specifies the worker class. Here, we useUvicornWorker
, which is essential for handling asynchronous tasks within the Gunicorn server.
Final Note: Remember to include all necessary dependencies, such as uvicorn
and gunicorn
, in your requirements.txt
.
Choreo simplifies this process by enabling developers to define these configurations using a Procfile, ensuring that your Python service scales efficiently with process managers.
Hardware-Level Scaling
For applications that experience fluctuating loads, horizontal scaling through a load balancer is often more efficient and cost-effective. Horizontal scaling allows your application to handle increased traffic by adding more instances rather than merely scaling up existing ones. However, this approach introduces challenges, particularly in managing shared memory resources like caches. In such scenarios, distributed caching solutions, such as Redis, are necessary to maintain application performance and consistency.
Choreo excels at enabling horizontal scaling by supporting auto-scaling based on CPU and memory usage. This allows your GenAI service to dynamically scale as load increases, ensuring optimal performance without over-provisioning. Auto-scaling policies can be configured to spawn new replicas when resource utilization exceeds certain thresholds, making it easier to maintain service quality during peak loads.
By combining process manager-level scaling with horizontal auto-scaling, Choreo provides a comprehensive solution for deploying and scaling GenAI applications. This dual strategy ensures that your application remains responsive and cost-effective, regardless of load conditions.
For more details on auto-scaling and horizontal scaling with Choreo, refer to the following documentation:
Handling Shared Memory with Choreo
One of the main challenges with horizontal scaling is managing shared memory, such as caches. In GenAI applications, storing chat history or other session data in memory can be problematic, especially in ephemeral environments where containers are short-lived.
With Choreo, this challenge can easily be addressed with Choreo managed caches that seamlessly integrate with your Python applications. By utilizing Choreo’s distributed caching, you can overcome the limitations of horizontal scaling and ensure that your GenAI application remains fully scalable and resilient.
For more information, check out Choreo’s documentation on Choreo cache.
Conclusion
Building and scaling GenAI applications can be complex, but with the right tools like Python and Choreo, it becomes significantly more manageable. By following best practices in asynchronous programming and leveraging Choreo’s robust deployment and scaling capabilities, developers can build scalable, production-grade GenAI applications with ease. Explore the Choreo Documentation to dive deeper into the concepts discussed and start building scalable GenAI applications today.
References
- Breaking Down Python Concurrency: The GIL and Its Effect on Multi-Threading
- The Power of Python: Building Robust & Scalable Web Applications
- Develop a Service
- Deploy an Application with Buildpacks
- Procfile in Buildpacks Deployment
- Autoscale Component Replicas
- Autoscale Components with Scale-to-Zero
- Choreo Managed Caches