Creating a RAG application with AWS CDK as IaC, Qdrant and LlamaIndex

Benito Martin
11 min readSep 3, 2024

--

Source: AWS

Infrastructure as Code (IaC) is a modern technique for managing and provisioning infrastructure resources through code. Rather than manually configuring these resources, you specify them in machine-readable configuration files. AWS offers two IaC tools: AWS CloudFormation and AWS CDK. CloudFormation provisions AWS resources using templates written in JSON or YAML, while the AWS CDK allows you to provision resources using familiar programming languages like Python. The CDK acts as an abstraction layer that simplifies the creation of CloudFormation templates.

In this blog I will guide you on how to set up a RAG application using AWS CDK step by step. The complete code can be found in this repository.

Let’s code!!!

Prerequisites

  1. An AWS account with necessary permissions
  2. One Qdrant cluster and the corresponding API Key and URL
  3. OpenAI API key
  4. Docker installed for building and pushing the application

Step 1: Setting Up Environment Variables

Create a .env file in your project directory with the following content:

QDRANT_URL=
QDRANT_API_KEY=
DOCUMENT_BUCKET=
OPENAI_API_KEY=

The document bucket will be created during the infrastructure provision. We will have to add it once it is available.

Step 2: Understanding CDK

The AWS CDK is an open-source software development framework that provisions the infrastructure through AWS CloudFormation and efficiently define the required AWS resources.

There are two components to consider:

  • WS CDK Construct Library: Constructs are pre-written modular and reusable pieces of code that represent resources or collection of resources
  • AWS CDK CLI: The command line tool for interacting with CDK apps. You can create, modify, manage, and deploy your AWS CDK projects with this tool.

Like can be seen in the image below, constructs are the building blocks of the application, which are encapsulated into a Stack. You define your constructs, and then synthesize the app to create the Cloudformation templates for each Stack. A Stack can be seen as a class and a construct like each component of that class.

class MyFirstStack(Stack):

def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)

s3.Bucket(self, "MyFirstBucket")

There are several levels of constructs (L1, L2 and L3), each one representing a level of abstraction from single resources (e.g S3 Bucket) to patterns (e.g. Load Balancer parameters)

Source: https://docs.aws.amazon.com/cdk/v2/guide/home.html

AWS CDK is gaining popularity over Cloudformation due to the possibility to introduce your own logic, conditions and debugging to determine your resources using a programming language. Also, as the complexity of the project grows, the modular and reusable blocks are more convenient, compared to the predefined Cloudformation templates.

Now that we have a better idea of AWS CDK let’s start our project and build a RAG application.

Step 3: CDK Initialization

To start the project, you need to have an empty repository as CDK will create a project structure for us. But before you start make sure AWS CLI and Node.js is installed. These two dependencies are required and can be installed following this AWS tutorial.

Once installed you should run this command in the root directory of your project.

cdk init app --language python

This will create a project structure like follows:

aws-cdk-rag-fargate/
├── README.md
├── .git/
├── app.py
├── cdk.json
├── .gitignore
├── requirements.txt
├── source.bat
├── tests/
│ └── __init__.py
│ └── unit/
│ ├── __init__.py
│ └── test_aws_cdk_rag-fargate_stack.py
└── aws-cdk-rag-fargate/
├── __init__.py
└──aws_cdk_rag_fargate_stack.py

The requirements.txt contains the cdk and construct libraries necessary to interact with the app. Therefore, it is recommended to create a virtual environment before installing them.

The main components of the directory can be explained as follow:

  • aws_cdk_rag_fargate_stack.py : this contains our stack and our constructs
  • test_aws_cdk_rag_fargate_stack.py : this contains the unit tests of the stack and our constructs
  • app.py : this script synthesizes the app to create the Cloudformation templates for each stack. We will create only one stack.

We still need to create our app with the RAG logic and the necessary files to run it. This will contain an app directory with a Dockerfile and RAG app and a scripts directory with two files: one to upload the PDFs into an AWS S3 bucket and a second one to create our Qdrant vector store index.

But before that we will define our infrastructure with our Constructs and Stack.

Step 4: Synthetization: AWS Constructs and Stack

We will create one stack and several constructs to define our serverless infrastructure. First, we well provision a Virtual Private Cloud (VPC) with two availability zones for redundancy purposes, one ECS Cluster that operates across all availability zones and one S3 bucket. By default, each VPC provisions one public subnet to route the public trafic and one private subnet to route traffic through the NAT gateway, which allows resources in the private subnet to access the internet indirectly.

class AwsCdkRagFargateStack(Stack): 

def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

# Create a VPC
vpc = ec2.Vpc(self, "RagVpc", max_azs=2)

# Create an ECS cluster
cluster = ecs.Cluster(self, "RagCluster", vpc=vpc)

# Create an S3 bucket to store documents
document_bucket = s3.Bucket(self, "RagDocumentBucket",
versioned=True,
encryption=s3.BucketEncryption.S3_MANAGED,
removal_policy=cdk.RemovalPolicy.DESTROY # Use RETAIN in production
)

Next, we will provision our AWS Fargate service, which we will define using the ecs_patterns module. AWS Fargate provides serverless container hosting, allowing us to run containers without managing the underlying EC2 instances. Instead of defining an EC2 instance type, the required compute resources are specified by defining CPU units and memory.

The Application Load Balancer (ALB) will distribute incoming traffic across the various resources. We specify a container image, defined by our Dockerfile, which will be used to deploy the container. However, at this stage, we do not need the Dockerfile, as the purpose of the synthesis is to generate the CloudFormation templates, not to build the container image.

Our application requires several API keys to function correctly. We will store these API keys in AWS Secrets Manager under the name "rag-app". This ensures that when the application is deployed, the necessary API keys are securely injected into the environment.

We also define scaling capabilities for our Fargate service to optimize costs and resource usage. This allows the service to automatically adjust the number of running tasks based on the current load, ensuring efficient use of resources.

        # Retrieve the secret
secret = secretsmanager.Secret.from_secret_name_v2(
self, "RagAppSecrets", "rag-app"
)

# Create a Fargate service with an Application Load Balancer
fargate_service = ecs_patterns.ApplicationLoadBalancedFargateService(
self, "RagFargateService",
cluster=cluster,
cpu=512,
memory_limit_mib=1024,
desired_count=1,
task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
image=ecs.ContainerImage.from_asset("app"),
container_port=8000,
environment={
"DOCUMENT_BUCKET": document_bucket.bucket_name,
},
secrets={
"QDRANT_URL": ecs.Secret.from_secrets_manager(secret, "QDRANT_URL"),
"QDRANT_API_KEY": ecs.Secret.from_secrets_manager(secret, "QDRANT_API_KEY"),
"OPENAI_API_KEY": ecs.Secret.from_secrets_manager(secret, "OPENAI_API_KEY"),
}
),
public_load_balancer=True
)

# Add Auto Scaling
scaling = fargate_service.service.auto_scale_task_count(
min_capacity=1,
max_capacity=3 # Adjust this value based on your needs
)

# Add CPU utilization scaling policy
scaling.scale_on_cpu_utilization("CpuScaling",
target_utilization_percent=70,
scale_in_cooldown=cdk.Duration.seconds(60),
scale_out_cooldown=cdk.Duration.seconds(60)
)

Finally, we provide access rights to our S3 bucket and output values that will be needed after deployment. These include the DNS name, which contains the URL endpoint for accessing our application, and the S3 bucket name, which we will use to store our documents.

        # Grant the task read and write access to the S3 bucket
document_bucket.grant_read_write(fargate_service.task_definition.task_role)

# Output the load balancer DNS
cdk.CfnOutput(self, "LoadBalancerDNS",
value=fargate_service.load_balancer.load_balancer_dns_name,
description="Load Balancer DNS",
export_name="RagLoadBalancerDNS"
)

# Output the S3 bucket name
cdk.CfnOutput(self, "DocumentBucketName",
value=document_bucket.bucket_name,
description="Document Bucket Name",
export_name="RagDocumentBucketName"
)

To synthesize our app, we need to include our AwsCdkRagFargateStack in the app.py file and then run the command cdk synth in the terminal. This command will perform basic validation of the CDK code and generate a CloudFormation template from your CDK stack.

#!/usr/bin/env python3
import os

import aws_cdk as cdk

from aws_cdk_rag_fargate.aws_cdk_rag_fargate_stack import AwsCdkRagFargateStack


app = cdk.App()
AwsCdkRagFargateStack(app, "AwsCdkRagFargateStack")

app.synth()

The final architecture looks like the picture below, with an additional S3 bucket that is external to the VPC.

Source: AWS

Step 5: CDK Bootstrapping

Before we create our app and collection in the Qdrant vector database, we need to set up our environment. Bootstrapping is the process of provisioning some AWS resources that are used by the AWS CDK. By running the command below a S3 bucket, an ECR and IAM roles to grant permissions needed by the AWS CDK to perform deployments will be provisioned. In AWS Cloudformation an CDKToolkit template will be created.

cdk bootstrap aws://{Account ID:}/{region}

Step 6: Create Qdrant Collection

Now it is time to create our app. In the project root directory, I created two additional folders: documents and scripts, one to save a sample pdf and another to run the scripts.

Under scripts this file s3_uploader.py, will load the documents to the S3 bucket. We need to add in the .env file the bucket name created during the bootstrapping before we run the script.

import boto3
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# AWS credentials
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region_name = os.getenv('AWS_REGION', 'eu-central-1')

# S3 bucket details
bucket_name = os.getenv('DOCUMENT_BUCKET')
local_directory = os.getenv('LOCAL_DOCUMENT_DIRECTORY', '../documents')
s3_subdirectory = 'documents/' # Subdirectory in S3 where files will be uploaded

# Create an S3 client
s3 = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)

# Load files to S3 Bucket
def upload_files(directory, bucket, subdirectory):
for subdir, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(subdir, file)
s3_path = os.path.join(subdirectory, os.path.relpath(file_path, directory))
try:
s3.upload_file(file_path, bucket, s3_path)
print(f'Successfully uploaded {file_path} to s3://{bucket}/{s3_path}')
except Exception as e:
print(f'Failed to upload {file_path}: {e}')

if __name__ == "__main__":
# Upload all files in the directory
upload_files(local_directory, bucket_name, s3_subdirectory)

The second file in the scripts folder qdrant_setup.py, will create the collection using the documents stored in the S3 bucket and the push the chunks/nodes using LlamaIndex to the collection.

import os
from uuid import uuid4
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.http.exceptions import ResponseHandlingException
from qdrant_client.models import Distance, PointStruct, VectorParams
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import SimpleDirectoryReader
from s3fs import S3FileSystem
import openai

# Load environment variables
load_dotenv()

# Configuration
QDRANT_API_KEY = os.getenv('QDRANT_API_KEY')
QDRANT_URL = os.getenv('QDRANT_URL')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
COLLECTION_NAME = os.getenv('QDRANT_COLLECTION_NAME', 'documents')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
BUCKET_NAME = os.getenv('DOCUMENT_BUCKET')
S3_PREFIX = 'documents/' # Subdirectory in S3 bucket where the documents are stored

openai.api_key = OPENAI_API_KEY

# Set OpenAI API key
if OPENAI_API_KEY is None:
raise ValueError("Please set the OPENAI_API_KEY environment variable.")

def get_documents_s3(aws_access_key_id, aws_secret_access_key, bucket_name, s3_prefix):
s3_fs = S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key)

bucket_path = f'{bucket_name}/{s3_prefix}'
print("Listing files in S3 path:")
for file in s3_fs.ls(bucket_path):
print(file)

try:
reader = SimpleDirectoryReader(
input_dir=bucket_path,
fs=s3_fs,
recursive=True
)
documents = reader.load_data()
return documents

except Exception as e:
print(f"Error: {e}")
return None

def split_documents_into_nodes(all_documents):
try:
splitter = SentenceSplitter(
chunk_size=1500,
chunk_overlap=200
)
nodes = splitter.get_nodes_from_documents(all_documents)
return nodes
except Exception as e:
print(f"Error splitting documents into nodes: {e}")
return []

def create_collection_if_not_exists(client, collection_name):
try:
collections = client.get_collections()
if collection_name not in [col.name for col in collections.collections]:
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
print(f"Collection '{collection_name}' created.")
else:
print(f"Collection '{collection_name}' already exists.")
except ResponseHandlingException as e:
print(f"Error checking or creating collection: {e}")

def process_and_upsert_nodes(data, client, collection_name, embed_model):
chunked_nodes = []

for item in data:
qdrant_id = str(uuid4())
document_id = item.id_
code_text = item.text
file_path = item.metadata["file_path"]
file_name = item.metadata["file_name"]

content_vector = embed_model.get_text_embedding(code_text)

payload = {
"text": code_text,
"document_id": document_id,
"metadata": {
"qdrant_id": qdrant_id,
"file_path": file_path,
"file_name": file_name,
}
}

metadata = PointStruct(id=qdrant_id, vector=content_vector, payload=payload)
chunked_nodes.append(metadata)

if chunked_nodes:
client.upsert(
collection_name=collection_name,
wait=True,
points=chunked_nodes
)

print(f"{len(chunked_nodes)} Chunked metadata upserted.")

if __name__ == "__main__":
# Get documents from S3
all_documents = get_documents_s3(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME, S3_PREFIX)

if all_documents:
# Split documents into nodes
nodes = split_documents_into_nodes(all_documents)

# Initialize embedding model
embed_model = OpenAIEmbedding(openai_api_key=OPENAI_API_KEY)

# Initialize Qdrant client
client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)

# Create collection if it does not exist
create_collection_if_not_exists(client, COLLECTION_NAME)

# Process and upsert documents in vector store
process_and_upsert_nodes(nodes[:15], client, COLLECTION_NAME, embed_model)
else:
print("No documents were loaded from S3. Please check your S3 configuration and bucket contents.")

Step 7: App Logic

Next, we need to deploy our app. Under the folder app two files must be created: main.py and Dockerfile. The first one is a FastAPI app which will use the newly created collection to provide an answer to a user question.

import os
import openai
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from llama_index.core.indices.vector_store.base import VectorStoreIndex
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from qdrant_client import QdrantClient
from contextlib import asynccontextmanager

print("Starting script...")

# Load environmental variables from .env file
load_dotenv()

QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
COLLECTION_NAME = os.getenv('QDRANT_COLLECTION_NAME', 'documents')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
openai.api_key = OPENAI_API_KEY

# Set OpenAI API key
if OPENAI_API_KEY is None:
raise ValueError("Please set the OPENAI_API_KEY environment variable.")

# Initialize Qdrant client
qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)


# Initialize embedding model
embed_model = OpenAIEmbedding(api_key=OPENAI_API_KEY)

# Initialize index
index = None

class Query(BaseModel):
question: str

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting up...")
await initialize_index()
yield
# Shutdown
print("Shutting down...")

app = FastAPI(lifespan=lifespan)

# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)

async def initialize_index():
global index
vector_store = QdrantVectorStore(client=qdrant_client, collection_name=COLLECTION_NAME)

index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=embed_model)

print("Index initialized successfully.")

@app.post("/query")
async def query(query: Query):
if not index:
raise HTTPException(status_code=500, detail="Index not initialized")

query_engine = index.as_query_engine()
response = query_engine.query(query.question)
return {"answer": str(response)}

@app.get("/health")
async def health():
return {"status": "healthy"}


@app.get("/")
def read_root():
"""Root endpoint returning API information."""
return {
"message": "Welcome to the CDK RAG API",
"version": "V0",
"documentation": "/docs", # FastAPI automatic docs
"health_check": "/health",
"usage": "Send a POST request to /query with a JSON body containing a 'question' field."
}

if __name__ == "__main__":
import uvicorn
print("About to start the server...")
uvicorn.run(app, host="127.0.0.1", port=8000)

print("Script finished.")

The Dockerfile will be pushed during deployment (next step), to the ECR repository and contains our app logic and requirements.

FROM python:3.11-slim

WORKDIR /app

# Copy requirements file
COPY requirements.txt .

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy the application
COPY . .


# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Step 8: App Deployment

Now it is time to deploy our app together with the infrastructure. By running the command cdk deploy our whole infrastructure will be deployed and a new template AwsCdkRagFargateStack will be created in AWS CloudFormation. If everything runs well, you should be able to see the Dockerfile in the ECR, the load balancer of the EC2 instances, and the ECS cluster with the AWS Fargate serverless configuration.

To test the app, you can find the DNS Name under load balancer in the AWS Console or retrieve it using the command below. Then you can perform a POST request with a query to get the answer.

Author: Benito Martin

Step 9: Clean Up

Once you no longer need the app, you can delete it with the following command cdk destroy.

Conclusion

In this post, we walked through the process of building, deploying, and scaling a Large Language Model (LLM) application using AWS CDK, LlamaIndex, and Qdrant. We covered the key steps to provision the necessary infrastructure and automate the entire deployment process. AWS CDK offers a high level of modularity and flexibility, enabling you to efficiently create and automate applications using AWS services. This approach streamlines the underlying deployment process, allowing you to focus on building your application rather than managing infrastructure.

If you enjoyed reading this content you can support me by:

  • Clapping and following me on Medium! 👏 👏 👏
  • Follow my Github 🎶 🎷 🎶
  • Staring the repo⭐⭐⭐
  • Share my content on LinkedIn! 💯💯💯
  • Buy me a coffee or support me on GitHub Sponsors 🚀🚀🚀

Thank you for following along, and happy coding!

--

--

Benito Martin

😎 Passionate Data Scientist, AI & ML Engineer | 👉 Founder Martin Data Solutions | ☎️ Book a Free Call https://martindatasol.com/book-a-free-call