Automating the creation of your own RAG System with Terraform, AWS Kendra and AWS Sagemaker (Part Three)

Andre Botta
7 min read5 days ago

--

We are finally at the third and last part of this series on how to deploy your own RAG system to AWS using terraform. In this article, I will cover how we can deploy a python lambda function that will use langchain library to combine our AWS Kendra Index and our AWS Sagemaker model and allow us to interact with them by asking questions.

Python Code

Let’s go over the code for our lambda function:

import json
import boto3
import os
from langchain.chains import ConversationalRetrievalChain
from langchain_community.llms import SagemakerEndpoint
from langchain_community.llms.sagemaker_endpoint import LLMContentHandler
from langchain_core.prompts import PromptTemplate
from langchain_community.retrievers import AmazonKendraRetriever
from langchain.schema import Document

kendra_index_id = os.getenv('KENDRA_INDEX')
sm_endpoint_name = os.getenv('LLM_ENDPOINT')

def lambda_handler(event, context):
question = event['question']
chat_history = [tuple(item) for item in event.get ('chat_history', [])]
model_parameters = {
"max_new_tokens": 450,
"return_full_text": False,
"top_p": 0.6,
"temperature": 0.9,
"top_k": 50,
"stop": ["</s>"],
"repetition_penalty": 1.1,
}

prompt_template = """
Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Be as detailed as you can.

{context}

Question: {question}

Helpful Answer:
"""
PROMPT = PromptTemplate(
template=prompt_template, input_variables=["context", "question"],
)
condense_qa_template = """
Given the following conversation and a follow up question, rephrase the follow up question
to be a standalone question.

Chat History:
{chat_history}
Follow Up Input: {question}

Standalone question:
"""
standalone_question_prompt = PromptTemplate.from_template(condense_qa_template)

kendra_client = boto3.client('kendra')
retriever = AmazonKendraRetriever(client=kendra_client, index_id=kendra_index_id, top_k=6)

class ContentHandler(LLMContentHandler):
content_type = "application/json"
accepts = "application/json"

def transform_input(self, prompt: str, model_kwargs: dict) -> bytes:
input_str = json.dumps({"inputs": prompt, "parameters": {**model_kwargs}})
return input_str.encode('utf-8')

def transform_output(self, output: bytes) -> str:
response_json = json.loads(output.read().decode("utf-8"))
return response_json[0]['generated_text']

content_handler = ContentHandler()
sagemaker_client = boto3.client("sagemaker-runtime")
llm = SagemakerEndpoint(
endpoint_name=sm_endpoint_name,
client=sagemaker_client,
model_kwargs=model_parameters,
content_handler=content_handler,
)
chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=retriever,
condense_question_prompt=standalone_question_prompt,
return_source_documents=True,
combine_docs_chain_kwargs={"prompt": PROMPT},
verbose=False
)

result = chain({"question": question, "chat_history": chat_history})
def custom_serializer(obj):
if isinstance(obj, Document):
return {
"content": obj.page_content,
"metadata": obj.metadata
}
return obj

return json.dumps(result, default=custom_serializer)

From the code, you can see that we start by defining our Kendra Index ID and Sagemaker Endpoint. Both variables will come from the terraform script when deploying the lambda function. We are expecting to receive a request in the form:

  payload = {
"question": "my question",
'chat_history': [("previous question", "previous answer")]
}

We then define a few custom parameters for our models. Also, notice how we define a ContentHandler class, that is a necessary step for the SagemakerEndpoint class. For this simple example we do define a custom serializer to be able to return the Documents returned by the Kendra Index.

This file should be saved under a folder called src in the root of your project folder. For this example, I have named the file: lambda_function.py

In the same src folder we can define the requirements.txt file as:

langchain==0.1.17
langchain-community==0.0.36
boto3>=1.28.27

Terraform Set Up

I am going to skip the backend.tf and provider.tf files since they are the same as Part One and Part Two. Our variables.tf file will define the ECR URI, where the docker image containing the python code and its dependencies, and the Kendra Index ID as well as the Sagemaker LLM Endpoint:

variable "aws_region" {
type = string
}

variable "IMAGE_ECR_URI"{
type = string
}

variable "KENDRA_INDEX_ID"{
type = string
default = "myindexid"
}

variable "LLM_ENDPOINT"{
type = string
default = "myllmendpoint"
}

And this is the main.tf . The main point here is of course the lambda_handler where we define the image URI and pass the necessary environment variables that our python code needs. We also define the execution role that our lambda function will assume giving it access to Kendra and Sagemaker.


resource "aws_lambda_function" "lambda_handler" {
function_name = "MyLambdaFunction"
package_type = "Image"
image_uri = "${var.IMAGE_ECR_URI}"

role = aws_iam_role.lambda_exec_role.arn

timeout = 300
memory_size = 1024

environment {
variables = {
KENDRA_INDEX = "${var.KENDRA_INDEX_ID}"
LLM_ENDPOINT = "${var.LLM_ENDPOINT}"
}
}
}

resource "aws_iam_role" "lambda_exec_role" {
name = "lambda_execution_role"

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow"
}
]
}
EOF
}

resource "aws_iam_policy" "lambda_logging" {
name = "lambda_logging_policy"
description = "IAM policy for logging from Lambda to CloudWatch"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
Resource = "arn:aws:logs:*:*:*"
}
]
})
}

resource "aws_iam_role_policy_attachment" "lambda_logs_attachment" {
role = aws_iam_role.lambda_exec_role.name
policy_arn = aws_iam_policy.lambda_logging.arn
}


resource "aws_iam_policy" "lambda_kendra_policy" {
name = "lambda_kendra_access"
description = "Allow Lambda to access Kendra"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"kendra:Query",
"kendra:BatchPutDocument",
"kendra:DescribeIndex",
"kendra:Retrieve"
],
Resource = "*"
}
]
})
}

resource "aws_iam_policy" "lambda_sagemaker_policy" {
name = "lambda_sagemaker_access"
description = "Allow Lambda to invoke SageMaker endpoints"

policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"sagemaker:InvokeEndpoint",
],
Resource = "*"
}
]
})
}

resource "aws_iam_role_policy_attachment" "lambda_kendra_attachment" {
role = aws_iam_role.lambda_exec_role.name
policy_arn = aws_iam_policy.lambda_kendra_policy.arn
}

resource "aws_iam_role_policy_attachment" "lambda_sagemaker_attachment" {
role = aws_iam_role.lambda_exec_role.name
policy_arn = aws_iam_policy.lambda_sagemaker_policy.arn
}

Gitlab Pipeline

Our gitlab pipeline is similar to the previous ones, the stages naming might be slightly different, but we have four main stages related to terraform: prepare, validate, plan and deploy but we are going to use the build stage to build our docker image containing the python code and dependencies for our lambda function:

image: registry.gitlab.com/gitlab-org/terraform-images/stable:latest


variables:
STATE_NAME: lambda
TF_ROOT: ${CI_PROJECT_DIR}
TF_ADDRESS: ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/terraform/state/${STATE_NAME}_state
TF_VAR_aws_region: ${AWS_DEFAULT_REGION}
cache:
key: ${STATE_NAME}_state
paths:
- ${TF_ROOT}/.terraform


before_script:
- cd ${TF_ROOT}


stages:
- prepare
- build
- validate
- plan
- deploy


init:
stage: prepare
script:
- gitlab-terraform init

build:
image: registry.gitlab.com/gitlab-org/cloud-deploy/aws-base:latest
variables:
DOCKER_DRIVER: overlay2
DOCKER_HOST: tcp://docker:2375
DOCKER_TLS_CERTDIR: ""
stage: build
services:
- name: <private ecr>/docker:dind
alias: docker
before_script:
- export DOCKER_HOST=tcp://docker:2375
- aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID
- aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY
- aws configure set default.region $AWS_DEFAULT_REGION
- cd src
script:
- chmod +x ./build_docker_image.sh
- source ./build_docker_image.sh
- cd ..
- echo $LAMBDA_ECR_URI
- echo $LAMBDA_ECR_URI >> build.env
artifacts:
reports:
dotenv: build.env


validate:
stage: validate
script:
- gitlab-terraform validate
dependencies:
- build
needs:
- job: build
artifacts: true


plan:
stage: plan
variables:
TF_VAR_IMAGE_ECR_URI: $LAMBDA_ECR_URI
script:
- gitlab-terraform plan
- gitlab-terraform plan-json
dependencies:
- validate
- build
needs:
- job: validate
- job: build
artifacts: true
artifacts:
name: plan
paths:
- ${TF_ROOT}/plan.cache
reports:
terraform: ${TF_ROOT}/plan.json

apply:
stage: deploy
script:
- gitlab-terraform apply
dependencies:
- plan
- build
when: manual

destroy:
stage: deploy
script:
- gitlab-terraform destroy
dependencies:
- plan
- build
when: manual

Let’s deep dive into the build job. You might have noticed that we have a service called docker for this job. In my case, I have access to a private repo that contains the docker:dind image, so if you want to replicate this example, you might need to adjust the job for your case. We also use a simple bash script to build and push the image to an AWS ECR which in this case is pre-existing, so no need to create it via terraform. This is the bash script that creates and pushed the image:

build_docker_image.sh


REQUIREMENTS_FILE="requirements.txt"
LAMBDA_FILE="lambda_function.py"
ECR_REPOSITORY_URI="repo_uri"
IMAGE_NAME="myimage"

HASH=$IMAGE_NAME.$(cat $REQUIREMENTS_FILE $LAMBDA_FILE | sha256sum | awk '{ print $1 }')
aws ecr get-login-password --region ap-southeast-2 | docker login --username AWS --password-stdin $ECR_REPOSITORY_URI
docker build --platform linux/amd64 -t $ECR_REPOSITORY_URI:$HASH .
docker push $ECR_REPOSITORY_URI:$HASH

And the Dockerfile used is:

FROM public.ecr.aws/lambda/python:3.12
COPY requirements.txt ${LAMBDA_TASK_ROOT}
RUN pip install -r requirements.txt
COPY lambda_function.py ${LAMBDA_TASK_ROOT}
CMD [ "lambda_function.lambda_handler" ]

The expected folder structure is that under the root folder, there is going to be a src folder that contains the files: build_docker_image.sh , Dockerfile , lambda_function.py and requirements.txt

Upon execution, the build job will create and push a docker image to your AWS ECR and export the image URI allowing the plan and apply jobs to deploy our lambda function.

End-to-End Testing

Our last step is, with no doubt, to test if everything is working as expected. There are many ways to invoke the lambda function, using the aws cli , using the sdk with the many supported programming or even just invoking the function through the AWS Console. For this example, I will be using the aws sdk for python invoking the function from my machine. Also, for this sample my AWS Kendra Index is using some documents related to Roads and Maritimes rules in Australia.

We start by defining a helper function that will actually call the lambda function:

def invokeLambdaHelper (client, question, chat_history):
payload = {
"question": question,
'chat_history': chat_history
}
response = client.invoke(
FunctionName=functionName,
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)

response_payload = json.loads(response['Payload'].read())
return json.loads(response_payload)

We can now invoke this function like this:

  config = botocore.config.Config(connect_timeout=300, read_timeout=300)
session = boto3.Session(profile_name=aws_profile)
client = session.client('lambda', config=config)
chat_history = []
question = "what is the minimum age to get a driver license?"
response = invokeLambdaHelper (client, question, chat_history)
print (response ['answer'])

I got this reply from my RAG System:

According to the provided documents, the minimum age to take the Driver Knowledge Test (DKT) is 16 years old. Additionally, the minimum age to take the Driving Test is 17 years old. However, it’s important to note that there might be additional requirements or restrictions depending on individual circumstances. For example, some licenses may require medical clearance or proof of residency.

And now let’s make use of the history feature, but adding this question-response to our next request:

  chat_history.append ((question, response ['answer']))
question = "does that mean I can't get a driver license if I am under 16 years old?"
response = invokeLambdaHelper (client, question, chat_history)
print (response ['answer'])

Giving me back the following answser:

No, according to the provided document excerpt, the minimum age to obtain a learner’s licence is 16 years old, but it does not specify whether someone under 16 years old can get a driver’s licence. In fact, the excerpt mentions “be aged 16 or over” as a requirement to get a driver licence. Therefore, it is unlikely that someone under 16 years old can get a driver’s licence.

And that is it! We are finally done with this series.

--

--