On AWS CDK and Amazon Bedrock Knowledge bases

Dirk Michel
25 min readJun 22, 2024

--

Amazon Bedrock is a fast-evolving managed service with new features, capabilities, and improvements being rolled out frequently. One of the more significant additions has been Knowledge bases for Amazon Bedrock, which connects supported foundation models (FMs) to internal and private data sources that a generic pre-trained model would not be aware of. Knowledge bases for Amazon Bedrock implement a managed Retrieval Augmented Generation (RAG) architecture that helps deliver more relevant, context-engineered, and perhaps sufficiently accurate responses without re-training and fine-tuning FMs.

Implementing RAG applications can generally be a time-consuming task, as it involves converting data sources, such as a corpus of documents, into vector embeddings, storing the embeddings in a database that supports vector plugin extensions, and developing specific data pipeline integrations with the databases to search and retrieve relevant information to augment user prompts.

Knowledge bases for Amazon Bedrock are a serverless and fully managed service that delivers entire ingestion and retrieval workflows to accelerate the implementation of RAG applications and enable context-relevant responses that are grounded in specific data sets.

Knowledge bases integrate with Agents for Amazon Bedrock, providing additional abstractions that help reduce “undifferentiated heavy lifting”. The AWS Cloud Development Kit (AWS CDK), a software development framework for defining cloud services as code, supports Amazon Bedrock and many supporting services needed for creating complete RAG applications on AWS.

For those on a tight time budget: The TL;DR of the following sections is to show an example of how to use the AWS CDK to rapidly prototype a RAG application as an API endpoint that answers questions on a corpus of documents using Agents and Knowledge bases for Amazon Bedrock. The API is backed by AWS Lambda, which invokes an Agent for Amazon Bedrock with access to a vector database implemented with Amazon OpenSearch Serverless and an Amazon Titan embedding model. Responsible AI principles are implemented with Guardrails for Amazon Bedrock, and the corpus of data is delivered and secured by Amazon S3.

This is illustrated in the reference diagram below.

RAG application reference architecture with Knowledge bases for Amazon Bedrock and Amazon OpenSearch.

Let’s do it.

The AWS CDK snippets provide working code to illustrate the creation of the various RAG application layers. AWS CDK lets us use supported programming languages to write compact code that generates AWS CloudFormation. To follow along, install Anaconda or your favourite Python virtual environment and a recent version of the AWS CDK construct library that includes support for Amazon Bedrock.

Let’s begin by describing the AWS CDK project layout and how the application is organised.

#!/usr/bin/env python3
import os
from cdk_nag import AwsSolutionsChecks

import aws_cdk as cdk
from stacks.data_stack import DataFoundationStack
from stacks.lambda_stack import LambdaStack
from stacks.bedrock_stack import BedrockStack
from stacks.aoss_stack import AossStack
from stacks.kb_stack import KnowledgeBaseStack
from stacks.streamlit_stack import StreamlitStack

app = cdk.App()

dict1 = {
"region": 'us-west-2',
"account_id": '851725325557'
}

stack1 = BedrockStack(app, "BedrockStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="Agents for Amazon Bedrock resources",
termination_protection=False,
tags={"project":"bedrock-agents"},
)

stack2 = AossStack(app, "AossStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="AWS OpenSearch Serverless resources",
termination_protection=False,
tags={"project":"bedrock-agents"},
dict1=dict1,
)

stack3 = KnowledgeBaseStack(app, "KnowledgeBaseStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="Knowledgebases for Amazon Bedrock agent resources",
termination_protection=False,
tags={"project":"bedrock-agents"},
dict1=dict1,
athena_lambda_arn=stack2.athena_lambda_arn,
search_lambda_arn=stack2.search_lambda_arn
)

stack4 = ApiGwStack(app, "ApiGwStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="AWS API Gateway resources",
termination_protection=False,
tags={"project":"bedrock-agents"},
dict1=dict1,
agent_arn=stack3.agent_arn,
)

stack2.add_dependency(stack1)
stack3.add_dependency(stack2)
stack4.add_dependency(stack3)


cdk.Aspects.of(app).add(AwsSolutionsChecks(verbose=False))

app.synth()

The following sections cover the walkthrough and illustrate the Stacks of the AWS CDK application.

1. Defining the Agent for Amazon Bedrock

Agents are a flexible entry point to working with Amazon Bedrock. They can be associated with Knowledge bases and they can also be extended later by adding Agent action groups to deliver agentic workflows with “function calling” or “tools” that interact with other external subsystems and APIs to execute “actions”, as described here.

The snippet below shows the Stack, starting with the first code block defining the IAM Role, permission policies for the Role, and the Agent. Notice how the IAM Role permission policies allow access to Amazon S3, Amazon Bedrock, and Amazon OpenSearch Serverless (AOSS). The Agent is created without any Action Groups, and details such as the prompt_override_configuration are not required. We are not yet creating an Agent Alias to deploy the Agent: This will be left to the end, once all necessary Agent configuration changes have been applied.

The second code block defines a Guardrail for Amazon Bedrock, helping build generative AI applications aligned with responsible AI policies. Guardrails can integrate with Agents and support content filters, as well as deny topics, sensitive information filters, and word filters.

from distutils.log import Log
from aws_cdk import (
Duration,
Stack,
CfnOutput,
RemovalPolicy,
aws_iam as iam,
aws_bedrock as bedrock,
aws_s3_deployment as s3d,
aws_s3 as s3,
aws_logs as logs,
Fn as Fn,
custom_resources as cr,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct
import hashlib

class BedrockStack(Stack):

def __init__(self, scope: Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

# Create a unique string to create unique resource names
hash_base_string = (self.account + self.region)
hash_base_string = hash_base_string.encode("utf8")

### 1. Create an agent for amazon bedrock

# Create a bedrock agent execution role (aka agent resource role) with permissions to interact with the services. The role name must follow a specific format.
bedrock_agent_role = iam.Role(self, 'bedrock-agent-role',
role_name=f'AmazonBedrockExecutionRoleForAgents_' + str(hashlib.sha384(hash_base_string).hexdigest())[:15],
assumed_by=iam.ServicePrincipal('bedrock.amazonaws.com'),
)

CfnOutput(self, "BedrockAgentRoleArn",
value=bedrock_agent_role.role_arn,
export_name="BedrockAgentRoleArn"
)

# Add model invocation inline permissions to the bedrock agent execution role
bedrock_agent_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"bedrock:InvokeModel",
"bedrock:InvokeModelEndpoint",
"bedrock:InvokeModelEndpointAsync"
],
resources=[
"arn:aws:bedrock:{}::foundation-model/anthropic.claude-3-haiku-20240307-v1:0".format(self.region)
]
,
)
)

# Add S3 access inline permissions to the bedrock agent execution role to write logs and access the data buckets
bedrock_agent_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject",
"s3:PutBucketLogging",
"s3:PutBucketVersioning",
"s3:PutBucketNotification",
],
resources=["*"],
)
)

# Add knowledgebase opensearch serverless inline permissions to the bedrock agent execution role
bedrock_agent_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["aoss:APIAccessAll"],
resources=["*"],
)
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/BedrockAgentStack/bedrock-agent-role/DefaultPolicy/Resource',
[NagPackSuppression(id="AwsSolutions-IAM5", reason="Role is controlled to services, and actions where limited service API calls required. Where wildcards are used, these are prefixed with resources partial or complete ARNs.")],
True
)

# Create a bedrock agent
bedrock_agent = bedrock.CfnAgent(self, 'bedrock-agent',
agent_name='my-bedrock-agent',
description="This is a bedrock agent that can be invoked by calling the bedrock agent alias and agent id.",
auto_prepare=True,
foundation_model="anthropic.claude-3-haiku-20240307-v1:0",
instruction=agent_instruction,
agent_resource_role_arn=str(bedrock_agent_role.role_arn),
)

CfnOutput(self, "BedrockAgentID",
value=bedrock_agent.ref,
export_name="BedrockAgentID"
)

CfnOutput(self, "BedrockAgentModelName",
value=bedrock_agent.foundation_model,
export_name="BedrockAgentModelName"
)

self.agent_arn = bedrock_agent.ref

### 2. Enable Guardrails for Amazon Bedrock

# Create a guardrail configuration for the bedrock agent
cfn_guardrail = bedrock.CfnGuardrail(self, "CfnGuardrail",
name=("guardrail-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
description="Guardrail configuration for the bedrock agent",
blocked_input_messaging="I'm sorry, I can't accept your prompt, as your prompt been blocked buy Guardrails.",
blocked_outputs_messaging="I'm sorry, I can't answer that, as the response has been blocked buy Guardrails.",
# Filter strength for incoming user prompts and outgoing agent responses
content_policy_config=bedrock.CfnGuardrail.ContentPolicyConfigProperty(
filters_config=[
bedrock.CfnGuardrail.ContentFilterConfigProperty(
input_strength="NONE",
output_strength="NONE",
type="PROMPT_ATTACK"
),
bedrock.CfnGuardrail.ContentFilterConfigProperty(
input_strength="HIGH",
output_strength="HIGH",
type="MISCONDUCT"
),
bedrock.CfnGuardrail.ContentFilterConfigProperty(
input_strength="HIGH",
output_strength="HIGH",
type="INSULTS"
),
bedrock.CfnGuardrail.ContentFilterConfigProperty(
input_strength="HIGH",
output_strength="HIGH",
type="HATE"
),
bedrock.CfnGuardrail.ContentFilterConfigProperty(
input_strength="HIGH",
output_strength="HIGH",
type="SEXUAL"
),
bedrock.CfnGuardrail.ContentFilterConfigProperty(
input_strength="HIGH",
output_strength="HIGH",
type="VIOLENCE"
)
]
)
)

# Create a Guardrail version
cfn_guardrail_version = bedrock.CfnGuardrailVersion(self, "MyCfnGuardrailVersion",
guardrail_identifier=cfn_guardrail.attr_guardrail_id,
description="This is the deployed version of the guardrail configuration",
)

# Custom resource to update the agent with the guardrail details, as cloudformation does not support this feature at this time
# Define the request body for the api call that the custom resource will use. Notice that the agentId is part of the URI and not the request body of the API call, but we can pass it in as a key value pair.
updateAgentParams = {
"agentId": bedrock_agent.attr_agent_id,
"agentName": bedrock_agent.agent_name,
"agentResourceRoleArn": bedrock_agent.agent_resource_role_arn,
"foundationModel": bedrock_agent.foundation_model,
"guardrailConfiguration": {
"guardrailIdentifier": cfn_guardrail.attr_guardrail_id,
"guardrailVersion": cfn_guardrail_version.attr_version
},
"idleSessionTTLInSeconds": 600

# Define a custom resource to make an AwsSdk updateAgent call
update_agent_cr = cr.AwsCustomResource(self, "UpdateAgentCustomResource",
on_create=cr.AwsSdkCall(
service="bedrock-agent",
action="updateAgent",
parameters=updateAgentParams,
physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN")
),
policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
)
)

# Define IAM permission policy for the custom resource
update_agent_cr.grant_principal.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"bedrock:UpdateAgent",
"iam:CreateServiceLinkedRole",
"iam:PassRole"
],
resources=[
f"arn:aws:bedrock:{self.region}:{self.account}:agent/{bedrock_agent.ref}"
],
)
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/BedrockAgentStack/UpdateAgentCustomResource/CustomResourcePolicy/Resource',
[NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

Guardrails for Amazon Bedrock are created as an independent resource, and its content_policy_config can define adjustable content filters, for example, that align with responsible AI policy requirements. The Guardrail then evaluates input user prompts and FM model output responses based on the content_policy_config details and provides an additional layer of safeguards on top of the FM's underlying protections.

Sidebar: Guardrails for Amazon Bedrock introduces an additional pre-processing step visible in the prompt template configuration: The advanced prompts specify how the “system prompts” are constructed at each step for the foundation model to interpret. Guardrails create an additional “turn” or FM model invocation, increasing the overall response time or latency. Enabling Amazon Bedrock model invocation logging can provide further insight into model activity.

Akin to Agent deployment and versions, Guardrails are also deployed by creating a version. The Agent begins to apply the protections once associated with a given Guardrail version.

This association can happen in two ways. Firstly, the Guardrail configuration details can be passed in during Agent creation, although at the time of writing, this is not supported via the CfnAgent class of AWS CDK v2.146.0. This will likely change in the future, but for now, we need to look towards the second option and update an existing Agent with the guardrailConfiguration details. We can use an AWS CDK Custom Resource to associate an existing Guardrail with an existing Agent by calling the UpdateAgent API directly, as illustrated by the code snippet. Interestingly, the UpdateAgent action overwrites all fields. Hence, all settings should be re-submitted in the UpdateAgent request body, including updated and preserved fields.

Sidebar: The AWS CDK Custom Resource module is very convenient and provides the AwsSdkCall class for “use cases or resource logic” that involves making a single API call. The class implements a singleton AWS Lambda function as a Provider, and the best practice for AWS CloudFormation signals.

2. Vector databases with Amazon OpenSearch Serverless

At this point, we have defined the Agent and Guardrails. The next step is to implement the vector database that will receive the embeddings produced by the embeddings model.

The vector database needs to be created and prepared following the chosen Knowledge base configuration: A mismatch between the vector database configuration and the Knowledge base settings would result in errors. Details that must align include the database engine, the number of vector dimensions, the index name, and specific index field names.

Knowledge bases support various database engines, including Amazon OpenSearch Serverless, which can function as a vector store powered by the k-nearest neighbour (k-NN) plugin. The k-NN OpenSearch API operations of Amazon OpenSearch support full-text search, filtering, aggregations, geospatial queries, and nested queries with accelerated retrieval and similarity search.

The first code block shows the definition of the AOSS database of type=”VECTORSEARCH” and the AOSS security model configurations, including the data encryption and network access policies.

The second block creates an IAM Role for the Knowledge base. Various other Roles will also need access to AOSS, including the Agent for Amazon Bedrock and an AWS CDK Custom Resource that will be used to create an AOSS index and a set of index fields.

Sidebar: The AOSS IAM permissions are essential and can appear convoluted to those new to the service. Interacting with the AOSS database itself, such as loading or retrieving data and creating indexes, for example, requires a set of AOSS IAM permissions designated as “aoss:*”, which is different from the “es:*” designation used for the Amazon OpenSearch service. Any such Roles must also be added to an AOSS data access policy. Both must be in place for the access to take effect. The code snippet below illustrates this with the bedrock_role, the bedrock_kb_role, and the create_index_lambda_role, which are all added to the AOSS data access policy.

Equally, notice how the AWS Lambda function resource uses an AWS Lambda Layer that contains Python libraries that are not included in the default AWS Lambda Python runtimes but which the Function code depends on. It also takes in a set of environment variables needed to connect to the AOSS collection. This is also where the IAM permissions come in again, as the AWS Lambda function needs to have the required permissions to access the vector database.

The third block creates an AWS CDK Custom Resource that performs AOSS database operations to create the index and a set of index fields. This custom resource, however, cannot use the AwsSdkCall class to call AOSS directly, as the AWS SDKs do not support AOSS database operations. Instead, AwsSdkCall invokes our own Lambda Function that we fully control, as defined by the Lambda-backed custom resource pattern.

Sidebar: The invoke action Lambda Function parameters use the “InvocationType”: “RequestResponse”, which invokes the function synchronously and keeps the custom resource running until the invoked function runs through to completion or times out. This helps ensure that AWS CloudFormation waits until the custom resource Lambda function is fully complete and correctly respects the add_dependency settings.

from aws_cdk import (
Duration,
Stack,
CfnOutput,
RemovalPolicy,
aws_iam as iam,
aws_lambda as _lambda,
aws_opensearchserverless as opensearchserverless,
Fn as Fn,
custom_resources as cr,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct
import hashlib
import uuid

class AossStack(Stack):

def __init__(self, scope: Construct, id: str, dict1, agent_arn, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

# Create a unique string to create unique resource names
hash_base_string = (self.account + self.region)
hash_base_string = hash_base_string.encode("utf8")

### 1. Create an opensearch serverless collection

# Creating an opensearch serverless collection requires a security policy of type encryption. The policy must be a string and the resource contains the collections it is applied to.
opensearch_serverless_encryption_policy = opensearchserverless.CfnSecurityPolicy(self, "OpenSearchServerlessEncryptionPolicy",
name="encryption-policy",
policy="{\"Rules\":[{\"ResourceType\":\"collection\",\"Resource\":[\"collection/*\"]}],\"AWSOwnedKey\":true}",
type="encryption",
description="the encryption policy for the opensearch serverless collection"
)

# We also need a security policy of type network so that the collection becomes accessable. The policy must be a string and the resource contains the collections it is applied to.
opensearch_serverless_network_policy = opensearchserverless.CfnSecurityPolicy(self, "OpenSearchServerlessNetworkPolicy",
name="network-policy",
policy="[{\"Description\":\"Public access for collection\",\"Rules\":[{\"ResourceType\":\"dashboard\",\"Resource\":[\"collection/*\"]},{\"ResourceType\":\"collection\",\"Resource\":[\"collection/*\"]}],\"AllowFromPublic\":true}]",
type="network",
description="the network policy for the opensearch serverless collection"
)

# Creating an opensearch serverless collection
opensearch_serverless_collection = opensearchserverless.CfnCollection(self, "OpenSearchServerless",
name="bedrock-kb",
description="An opensearch serverless vector database for the bedrock knowledgebase",
standby_replicas="DISABLED",
type="VECTORSEARCH"
)

opensearch_serverless_collection.add_dependency(opensearch_serverless_encryption_policy)
opensearch_serverless_collection.add_dependency(opensearch_serverless_network_policy)

CfnOutput(self, "OpenSearchCollectionArn",
value=opensearch_serverless_collection.attr_arn,
export_name="OpenSearchCollectionArn"
)

CfnOutput(self, "OpenSearchCollectionEndpoint",
value=opensearch_serverless_collection.attr_collection_endpoint,
export_name="OpenSearchCollectionEndpoint"
)

### 2. Creating an IAM role and permissions that we will need later on

bedrock_role_arn = Fn.import_value("BedrockAgentRoleArn")

# Create a bedrock knowledgebase role. Creating it here so we can reference it in the access policy for the opensearch serverless collection
bedrock_kb_role = iam.Role(self, 'bedrock-kb-role',
role_name=("bedrock-kb-role-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
assumed_by=iam.ServicePrincipal('bedrock.amazonaws.com'),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonBedrockFullAccess'),
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonOpenSearchServiceFullAccess'),
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonS3FullAccess'),
iam.ManagedPolicy.from_aws_managed_policy_name('CloudWatchLogsFullAccess'),
],
)

# Add inline permissions to the bedrock knowledgebase execution role
bedrock_kb_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["aoss:APIAccessAll"],
resources=["*"],
)
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/AossStack/bedrock-kb-role/Resource',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Premissive permissions required as per aoss documentation."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Premissive permissions required as per aoss documentation.")],
True
)

bedrock_kb_role_arn = bedrock_kb_role.role_arn

CfnOutput(self, "BedrockKbRoleArn",
value=bedrock_kb_role_arn,
export_name="BedrockKbRoleArn"
)

### 3. Create a custom resource that creates a new index in the opensearch serverless collection

# Define the index name
index_name = "kb-docs"

# Define the Lambda function that creates a new index in the opensearch serverless collection
create_index_lambda = _lambda.Function(
self, "Index",
runtime=_lambda.Runtime.PYTHON_3_12,
handler='create_oss_index.handler',
code=_lambda.Code.from_asset("lambda"),
timeout=Duration.seconds(60),
environment={
"COLLECTION_ENDPOINT": opensearch_serverless_collection.attr_collection_endpoint,
"INDEX_NAME": index_name,
"REGION": dict1['region'],
}
)

# Define IAM permission policy for the Lambda function. This function calls the OpenSearch Serverless API to create a new index in the collection and must have the "aoss" permissions.
create_index_lambda.role.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"es:ESHttpPut",
"es:*",
"iam:CreateServiceLinkedRole",
"iam:PassRole",
"iam:ListUsers",
"iam:ListRoles",
"aoss:APIAccessAll",
"aoss:*"
],
resources=["*"],
))

# Create a Lambda layer that contains the requests library, which we use to call the OpenSearch Serverless API
layer = _lambda.LayerVersion(
self, 'py-lib-layer-for-index',
code=_lambda.Code.from_asset('assets/lambda_layer_with_py_deps.zip'),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_12],
)

# Add the layer to the search lambda function
create_index_lambda.add_layers(layer)

# Finally we can create a complete data access policy for the collection that also includes the lambda function that will create the index. The policy must be a string and the resource contains the collections it is applied to.
opensearch_serverless_access_policy = opensearchserverless.CfnAccessPolicy(self, "OpenSearchServerlessAccessPolicy",
name=f"data-policy-" + str(uuid.uuid4())[-6:],
policy=f"[{{\"Description\":\"Access for bedrock\",\"Rules\":[{{\"ResourceType\":\"index\",\"Resource\":[\"index/*/*\"],\"Permission\":[\"aoss:*\"]}},{{\"ResourceType\":\"collection\",\"Resource\":[\"collection/*\"],\"Permission\":[\"aoss:*\"]}}],\"Principal\":[\"{bedrock_role_arn}\",\"{bedrock_kb_role_arn}\",\"{create_index_lambda.role.role_arn}\"]}}]",
type="data",
description="the data access policy for the opensearch serverless collection"
)

opensearch_serverless_access_policy.add_dependency(opensearch_serverless_collection)

# Define the request body for the lambda invoke api call that the custom resource will use
aossLambdaParams = {
"FunctionName": create_index_lambda.function_name,
"InvocationType": "RequestResponse"
}

# On creation of the stack, trigger the Lambda function we just defined
trigger_lambda_cr = cr.AwsCustomResource(self, "IndexCreateCustomResource",
on_create=cr.AwsSdkCall(
service="Lambda",
action="invoke",
parameters=aossLambdaParams,
physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN")
),
policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
),
removal_policy = RemovalPolicy.DESTROY,
timeout=Duration.seconds(120)
)

# Define IAM permission policy for the custom resource
trigger_lambda_cr.grant_principal.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["lambda:*", "iam:CreateServiceLinkedRole", "iam:PassRole"],
resources=["*"],
)
)

# Only trigger the custom resource after the opensearch access policy has been applied to the collection
trigger_lambda_cr.node.add_dependency(opensearch_serverless_access_policy)
trigger_lambda_cr.node.add_dependency(opensearch_serverless_collection)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/AossStack/IndexCreateCustomResource/CustomResourcePolicy/Resource',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by Custom Resource."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/AossStack/AWS679f53fac002430cb0da5b7982bd2287/ServiceRole',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by Custom Resource."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/AossStack/Index/ServiceRole',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by Custom Resource."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

OpenSearch is an API-driven database and provides REST APIs for most operations; therefore, “database definition” actions and “data manipulation queries” are submitted as OpenSearch API requests. AWS IAM protects all APIs that AWS provides, and therefore, all requests to those APIs must be signed by AWS SigV4; the OpenSearch database APIs are no exception.

The AWS Lambda function code uses the Python requests library and AWS SDK for Python (Boto3) helper functions to craft a signed AOSS API call that creates the index and fields.

The first part of the code snippet shows the definition of the payload request body. The request body or payload of the AOSS API call contains the OpenSearch k-NN create index and field settings that align with the Knowledge base requirements. Notice how the request JSON contains the index settings and the mappings for the index with the three mandatory fields: the vector field, the metadata field, and the text field. The chosen embedding model determines the vector fields and the number of dimensions.

from requests import request
import json
import os
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.exceptions import BotoCoreError, ClientError
from time import sleep

def handler(event, context):
# 1. Defining the request body for the index and field creation
host = os.environ["COLLECTION_ENDPOINT"]
print(f"Collection Endpoint: " + host)
index_name = os.environ["INDEX_NAME"]
print(f"Index name: " + index_name)
url = host + "/" + index_name
print(f"URL: " + url)
headers = {
'content-type': 'application/json',
'accept': 'application/json',
}
payload = {
"settings": {
"index": {
"knn": "true"
}
},
"mappings": {
"properties": {
"vectorField": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"name": "hnsw",
"engine": "faiss",
"space_type": "l2",
"parameters": {
"ef_construction": 1536,
"m": 16,
"ef_search": 1536
}
}
},
"metadataField": {
"type": "text"
},
"textField": {
"type": "text"
}
}
}
}

# 2. Obtaining AWS credentials and signing the AWS API request
region = os.environ["REGION"]
service = 'aoss'
credentials = boto3.Session().get_credentials()

params = None
payload_json = json.dumps(payload)

signer = SigV4Auth(credentials, service, region)
while True:
try:
req = AWSRequest(method='PUT', url=url, data=payload_json, params=params, headers=headers)
req.headers['X-Amz-Content-SHA256'] = signer.payload(req) # Add the payload hash to the headers as aoss requires it !
SigV4Auth(credentials, service, region).add_auth(req)
req = req.prepare()

response = request(
method=req.method,
url=req.url,
headers=req.headers,
data=req.body
)

if response.status_code != 200:
raise Exception(f"Failed to create AOSS index - status: {response.status_code}")

except Exception as e:
print('Retrying to create aoss index...')
sleep(5)
continue

print(f"Index create SUCCESS - status: {response.text}")
break

The second part of the code snippet applies the SigV4 signatures over the headers and payload and makes the request.

The initial step is to request AWS credentials, which can be obtained through a Boto3 session and boto3.Session().get_credentials(). Then, a “signer” object is defined with botocore auth and its SigV4Auth function, which can be used later to compute SigV4 signatures over any blob we may need.

Sidebar: Botocore and Boto3 are both Python libraries for interacting with AWS services, but they serve different purposes and are used in different contexts. Botocore is a low-level, core Python library for AWS. It provides the foundational services and capabilities used to interact with AWS using Python. Boto3, on the other hand, is built on top of botocore and provides a high-level object-oriented API. Boto3 makes integrating Python scripts with AWS services easier and provides “pythonic” interfaces to AWS services such as Amazon S3, Amazon EC2, and others.

Next, the botocore AWSRequest function is used to conveniently assemble an “AWS formatted” request by simply passing in arguments for the method, the URL, the data payload_json request body, the params, and the request headers. Then, we add the ‘X-Amz-Content-SHA256’ header that contains the SigV4 signature that the “singer” computed across the payload blob as its value.

Sidebar: The AOSS API insists on receiving the 'X-Amz-Content-SHA256' request header containing the SigV4 signature. It won’t accept requests without the “signed payload option” being used.

Now, the signer (SigV4Auth) is used again to add the AWS credential headers to the request. Then, the AWSRequest.prepare function is used to prepare the final “AWS formatted” request string that can be passed to the requests library to make the API call towards AOSS and create the required indexes and fields.

3. Defining the Knowledge base for Amazon Bedrock

Now that we have the vector database foundations in place, we can define the Knowledge base for Amazon Bedrock, which ties together the various resources created by the preceding Stacks.

The first block creates an Amazon S3 bucket and uploads the corpus of documents with the aws_s3_deployment construct library.

The second block defines the actual Knowledge base resource (with the details about the choices we made around the vector database) and the DataSource, which contains information on the storage type and location of the corpus data. The DataSource object also includes a text splitter that splits long documents into smaller chunks that can fit into the model’s context window. The settings for the DataSource include the chunking strategy, maximum token sizes, and an overlap percentage to preserve the relation between adjacent chunks. These parameters affect cost-management considerations and the verbosity of the returned search results. The AOSS index and field details must match those of the AOSS database.

The third block uses the convenient custom_resources.AwsSdkCall to trigger an ingestion job for the DataSource, through which the source data is run through the embedding model, and the results are written into AOSS: At the time of writing, AWS CDK and AWS CloudFormation do not yet support triggering the “data source synch” process. Hence, we use the AwsSdkCall to trigger the startIngestionJob action on the DataSource. Once triggered, the service handles the entire “ETL data pipeline and ingestion” workflow: The ingestion job splits each document file into chunks as defined by the chunking strategy and max token settings, creates their embeddings, and writes them into the AOSS vector index, including references to the original documents. This “synching process” can be triggered whenever new documents arrive on Amazon S3.

The fourth block creates an association between the Knowledge base and the Agent. The Agent orchestrates incoming user prompts and manages the retrieval and generation part of the runtime workflow, including vectorisation of user prompts and performing the “semantic similarity searches” that identify the most relevant chunks. The “Agents for Amazon Bedrock API” provides the AssociateAgentKnowledgebase action to accomplish the association, but this call is not yet supported by AWS CloudFormation, either. This may change, but for now, we can use the AwsSdkCall pattern again.

from distutils.log import Log
from aws_cdk import (
Duration,
Stack,
CfnOutput,
RemovalPolicy,
aws_bedrock as bedrock,
aws_s3_deployment as s3d,
Fn as Fn,
custom_resources as cr,
aws_iam as iam,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct
import hashlib

class KnowledgeBaseStack(Stack):

def __init__(self, scope: Construct, id: str, dict1, agent_arn, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

# Create a unique string to create unique resource names
hash_base_string = (self.account + self.region)
hash_base_string = hash_base_string.encode("utf8")

bedrock_role_arn = Fn.import_value("BedrockAgentRoleArn")

### 1. Create bucket and upload corpus of documents

# Create S3 bucket for the knowledgebase assets
kb_bucket = s3.Bucket(self, "Knowledgebase",
bucket_name=("knowledgebase-bucket-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
auto_delete_objects=True,
versioned=True,
removal_policy=RemovalPolicy.DESTROY,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
enforce_ssl=True,
encryption=s3.BucketEncryption.S3_MANAGED,
server_access_logs_bucket=logs_bucket,
server_access_logs_prefix="knowledgebase-access-logs/",
intelligent_tiering_configurations=[
s3.IntelligentTieringConfiguration(
name="my_s3_tiering",
archive_access_tier_time=Duration.days(90),
deep_archive_access_tier_time=Duration.days(180),
prefix="prefix",
tags=[s3.Tag(
key="key",
value="value"
)]
)],
lifecycle_rules=[
s3.LifecycleRule(
noncurrent_version_expiration=Duration.days(7)
)
],
)

kb_bucket.grant_read_write(iam.ServicePrincipal("bedrock.amazonaws.com"))

# Upload doc assets to S3 bucket. may contain large files so adjust the ephemeral storage size and increase timeout
upload_docs = s3d.BucketDeployment(self, "KnowledgebaseDocs",
sources=[s3d.Source.asset("assets/kb-docs/")],
destination_bucket=kb_bucket,
destination_key_prefix="docs/",
ephemeral_storage_size=Size.gibibytes(1),
memory_limit=1024,
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/DataStack/Custom::CDKBucketDeployment8693BB64968944B69AAFB0CC9EB8756C1024MiB1024MiB/ServiceRole',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by the Construct."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by the Construct.")],
True
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/DataStack/Custom::CDKBucketDeployment8693BB64968944B69AAFB0CC9EB8756C1024MiB1024MiB/Resource',
[NagPackSuppression(id="AwsSolutions-L1", reason="Lambda is owned by AWS construct")],
True
)

### 2. Create bedrock knowledgebase for the agent

kb_bucket_name = Fn.import_value("KnowledgebaseBucketName")
index_name = "kb-docs"

# Create the bedrock knowledgebase with the role arn that is referenced in the opensearch data access policy
bedrock_knowledge_base = bedrock.CfnKnowledgeBase(self, "KnowledgeBaseDocs",
name="bedrock-kb-docs",
description="Bedrock knowledge base that contains a corpus of documents",
role_arn=kb_bucket.bucket_arn,
knowledge_base_configuration=bedrock.CfnKnowledgeBase.KnowledgeBaseConfigurationProperty(
type="VECTOR",
vector_knowledge_base_configuration=bedrock.CfnKnowledgeBase.VectorKnowledgeBaseConfigurationProperty(
embedding_model_arn=f"arn:aws:bedrock:{dict1['region']}::foundation-model/amazon.titan-embed-text-v1"
),
),
storage_configuration=bedrock.CfnKnowledgeBase.StorageConfigurationProperty(
type="OPENSEARCH_SERVERLESS",
opensearch_serverless_configuration=bedrock.CfnKnowledgeBase.OpenSearchServerlessConfigurationProperty(
collection_arn=Fn.import_value("OpenSearchCollectionArn"),
vector_index_name=index_name,
field_mapping = bedrock.CfnKnowledgeBase.OpenSearchServerlessFieldMappingProperty(
metadata_field="metadataField",
text_field="textField",
vector_field="vectorField"
)
),
),
)

CfnOutput(self, "BedrockKbName",
value=bedrock_knowledge_base.name,
export_name="BedrockKbName"
)

# Create the data source for the bedrock knowledge base. Chunking max tokens of 300 is bedrock's sensible default.
kb_data_source = bedrock.CfnDataSource(self, "KbDataSource",
name="KbDataSource",
knowledge_base_id=bedrock_knowledge_base.ref,
description="The S3 data source definition for the bedrock knowledge base",
data_source_configuration=bedrock.CfnDataSource.DataSourceConfigurationProperty(
s3_configuration=bedrock.CfnDataSource.S3DataSourceConfigurationProperty(
bucket_arn=Fn.import_value("KnowledgebaseBucketArn"),
inclusion_prefixes=["docs"],
),
type="S3"
),
vector_ingestion_configuration=bedrock.CfnDataSource.VectorIngestionConfigurationProperty(
chunking_configuration=bedrock.CfnDataSource.ChunkingConfigurationProperty(
chunking_strategy="FIXED_SIZE",
fixed_size_chunking_configuration=bedrock.CfnDataSource.FixedSizeChunkingConfigurationProperty(
max_tokens=300,
overlap_percentage=20
)
)
)
)

CfnOutput(self, "BedrockKbDataSourceName",
value=kb_data_source.name,
export_name="BedrockKbDataSourceName"
)

# Only trigger the custom resource when the kb is completed
kb_data_source.node.add_dependency(bedrock_knowledge_base)

## 3. Start ingestion job for the knowledge base data source

# Could be triggered outside of AWS CDK as it is a long running job, which can cause issues when stack rollbacks occur

# Custom resource to start the data source synch job, aka the data ingestion job
# Define the parameters for the ingestion job. the boto3 client will create the correct PUT request to the bedrock-agent API
# This is an example of passing the params as a dictionary, although the direct API call uses a PUT to pass the params
dataSourceIngestionParams = {
"dataSourceId": kb_data_source.attr_data_source_id,
"knowledgeBaseId": bedrock_knowledge_base.attr_knowledge_base_id,
}

# Define a custom resource to make an AwsSdk startIngestionJob call
ingestion_job_cr = cr.AwsCustomResource(self, "IngestionCustomResource",
on_create=cr.AwsSdkCall(
service="bedrock-agent",
action="startIngestionJob",
parameters=dataSourceIngestionParams,
physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN")
),
policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
)
)

# Define IAM permission policy for the custom resource
ingestion_job_cr.grant_principal.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:*", "iam:CreateServiceLinkedRole", "iam:PassRole"],
resources=["*"],
)
)

# Only trigger the custom resource when the kb data source is created
ingestion_job_cr.node.add_dependency(kb_data_source)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/KnowledgebaseStack/AWS679f53fac002430cb0da5b7982bd2287/ServiceRole',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by Custom Resource."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/KnowledgebaseStack/IngestionCustomResource/CustomResourcePolicy/Resource',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by Custom Resource."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

### 4. Associate the knowledge base with the agent

# A bug prevents the use of the association "Agent DRAFT version doesn't exist", which is forced due to pattern matching in the bedrock-agent API

agent_id = Fn.import_value("BedrockAgentID")

# Custom resource to associate the knowledge base with the agent
# Define the parameters. the boto3 client will create the correct PUT request to the bedrock-agent API
# This is an example of passing the params as a dictionary, although the direct API call uses a PUT to pass the params
# The agent version must always be DRAFT as its being pattern matched in the bedrock-agent API
agentKbAssociationParams = {
"agentId": agent_id,
"agentVersion": "DRAFT",
"description": "This knowledge base contains product infomation. You can use it to answer questions about various production, including product x, y, z and more.",
"knowledgeBaseId": bedrock_knowledge_base.attr_knowledge_base_id,
"knowledgeBaseState": "ENABLED",
}

# Define a custom resource to make an AwsSdk call to associate the knowledge base with the agent
agent_kb_association_cr = cr.AwsCustomResource(self, "AgentKbCustomResource",
on_create=cr.AwsSdkCall(
service="bedrock-agent",
action="AssociateAgentKnowledgeBase",
parameters=agentKbAssociationParams,
physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN")
),
policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
)
)

# Define IAM permission policy for the custom resource
agent_kb_association_cr.grant_principal.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"bedrock:AssociateAgentKnowledgeBase",
"iam:CreateServiceLinkedRole",
"iam:PassRole",
"lambda:invoke",
],
resources=["*"],
)
)

NagSuppressions.add_resource_suppressions(
agent_kb_association_cr,
[NagPackSuppression(id="AwsSolutions-IAM5", reason="We support the use of wildcards.")],
True
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/KnowledgebaseStack/AWS679f53fac002430cb0da5b7982bd2287/ServiceRole',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by Custom Resource."), NagPackSuppression(id="AwsSolutions-IAM5", reason="Policies are set by Custom Resource.")],
True
)

# Only trigger the custom resource when the kb is completed
agent_kb_association_cr.node.add_dependency(kb_data_source)

### 5. Create an agent alias to deploy agent

# Start by preparing the draft agent version
prepareAgentParams = {
"agentId": agent_id,
}

# Define a custom resource to make an AwsSdk call to prepare the agent
prepare_agent_cr = cr.AwsCustomResource(self, "PrepareAgent",
on_create=cr.AwsSdkCall(
service="bedrock-agent",
action="prepareAgent",
parameters=prepareAgentParams,
physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN")
),
policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
)
)

# Define IAM permission policy for the custom resource
prepare_agent_cr.grant_principal.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"bedrock:prepareAgent",
"iam:CreateServiceLinkedRole",
"iam:PassRole",
"lambda:*",
],
resources=["*"],
)
)

NagSuppressions.add_resource_suppressions(
prepare_agent_cr,
[NagPackSuppression(id="AwsSolutions-IAM5", reason="We support the use of wildcards. But a backlog item should be added to restrict the resources to the specific resources that the custom resource needs to access")],
True
)

prepare_agent_cr.node.add_dependency(agent_kb_association_cr)

# Then create an alias to deploy the agent

# Create an alias for the bedrock agent
cfn_agent_alias = bedrock.CfnAgentAlias(self, "MyCfnAgentAlias",
agent_alias_name="bedrock-agent-alias",
agent_id=bedrock_agent_id,
description="bedrock agent alias to simplify agent invocation",
# note: when initially creating the agent alias, the agent version is defined automatically
# routing_configuration=[bedrock.CfnAgentAlias.AgentAliasRoutingConfigurationListItemProperty(
# agent_version="1",
# )],
tags={
"owner": "saas"
}
)

agent_alias_string = cfn_agent_alias.ref
agent_alias = agent_alias_string.split("|")[-1]

CfnOutput(self, "BedrockAgentAlias",
value=agent_alias,
export_name="BedrockAgentAlias"
)

cfn_agent_alias.node.add_dependency(prepare_agent_cr)

The fifth code block finally deploys the Agent by creating an Agent Alias. All Agent configuration updates, thus far, have been applied to the current “DRAFT” version. Agent deployment is handled through Agent Aliases and “immutable snapshot” versions. Versions are runtime resources and cannot be updated: Only an Agent’s current working draft version can be edited and used to create new versions. Therefore, we have a current working draft of the Agent configuration that contains both the Guardrail and the Knowledge base associations. A new deployment with a new alias and version is required before the Agent can be accessed and invoked.

4. Creating a RestAPI for consuming applications

The last Stack defines the user or application facing RestAPI and its backing AWS Lambda function. Fronting an Agent for Bedrock and its RAG implementation through a standard RestAPI can provide a “clean, simple, and secure” integration point for front-end applications. Many popular frontend data application frameworks for Python, such as Streamlit and Dash, can easily integrate in that way and avoid requiring every application to implement its way of directly integrating to AWS Service APIs.

Abstracting the details away from the consumer can also open up opportunities to provide a well-known, stable, standard interface and offer additional features such as flexible authorisers, application firewalls, request validation, caching options, custom domain names, and canary release deployments, to name a few.

The first code block defines an AWS Lambda function containing the handler code to invoke the Agent for Amazon Bedrock: The Agent is the entry point that runs the orchestration and Knowledge base workflow. The AWS Lambda function implements the invocation, processes the streamed and chunked Agent invocation response, and then returns a response body containing the answer to the AWS API Gateway RestAPI.

Sidebar: The Agents for Amazon Bedrock runtime also provides lower-level APIs for directly interacting with knowledge bases, such as the “retrieve and generation API” and the “retrieval API”. However, using the InvokeAgent API provides the highest degree of abstraction.

The second code block begins by defining an AWS CloudWatch Logs log group and then creates the AWS API Gateway RestAPI endpoint with the LambdaRestApi Level2 construct from the AWS CDK library. This construct exposes convenient parameters for the API deployment and integration options and applies sensible defaults for the remainder, such as setting the integration type to AWS_PROXY, aka “Lambda proxy integration”. Enabling proxy mode simplifies the integration with the backend Lambda function, as the API Gateway would create a catch-all “ANY” method and pass all incoming requests and methods from the client as input to the backing Lambda function. Setting the proxy parameter to false and adding a single specific POST method with the desired configuration, however, can be a good option, too.

The AWS API Gateway has additional supporting concepts, including usage plans and API keys. The AWS CDK construct library provides the UsagePlan L2 construct, and we use it to define throttling limits that set the target point at which request throttling should start, and quota limit that sets the target maximum number of requests with a given API key can sustain in a specified time interval. Usage plans take effect when associated with an AWS API Gateway Stage. API stages are “named reference to a deployment, which is a runtime snapshot of the API” and contain the endpoint URL on which the API is reachable. The usage plan parameters allow for that association.

However, the API keys are created separately from the L2 ApiKey construct. As shown in the code snippet, the keys can then be associated with a usage plan by applying the add_api_key() method.

from aws_cdk import (
Stack,
aws_iam as iam,
aws_logs as logs,
aws_apigateway as apigw,
aws_lambda as _lambda,
Duration as Duration,
Fn as Fn,
RemovalPolicy,
CfnOutput,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct
import hashlib

class ApiGwStack(Stack):

def __init__(self, scope: Construct, construct_id: str, dict1, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)


### 1. Create a lambda function that invokes the agent

# Create the lambda function that will be invoked by the API Gateway
agent_invocation_lambda = _lambda.Function(
self, 'agent-invocation-lambda',
runtime=_lambda.Runtime.PYTHON_3_12,
code=_lambda.Code.from_asset('lambda'),
handler='agent_invocation.handler',
timeout=Duration.seconds(60),
memory_size=1024,
environment={
"BEDROCK_AGENT_ID": Fn.import_value("BedrockAgentID"),
"BEDROCK_AGENT_ALIAS": Fn.import_value("BedrockAgentAlias"),
"REGION": dict1['region'],
},
current_version_options=_lambda.VersionOptions(
removal_policy=RemovalPolicy.RETAIN,
provisioned_concurrent_executions=2
),
)

# Define bedrock permission for the Lambda function. This function calls the Bedrock API to invoke the agent and must have the "bedrock" permissions.
agent_invocation_lambda.role.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"bedrock:InvokeAgent"
],
resources=[
"*"
],
)
)

# Export the lambda arn
CfnOutput(self, "LambdaAgentInvocationHandlerArn",
value=agent_invocation_lambda.function_arn,
export_name="LambdaAgentInvocationHandlerArn"
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/ApiGwStack/agent-invocation-lambda/ServiceRole',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="Policies are set by the Construct."), NagPackSuppression(id="AwsSolutions-IAM5", reason="The agent does need to invoke changing agents and the wildcard is needed to avoid unreasonable toil.")],
True
)

### 2. Creating an api gateway that web applications can access to invoke the agent

# Create an access log group for the API Gateway access logs
apigw_access_loggroup = logs.LogGroup(self, "apigw-log-group",
log_group_name=("apigw-access-log-group-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
log_group_class=logs.LogGroupClass.STANDARD,
retention=logs.RetentionDays.ONE_MONTH,
removal_policy=RemovalPolicy.DESTROY
)

# Create a new API Gateway, as a public endpoint. Defines an API Gateway REST API with AWS Lambda proxy integration.
agent_apigw_endpoint = apigw.LambdaRestApi(self, "agent_apigw_endpoint",
rest_api_name="agent_apigw_endpoint",
handler=agent_invocation_lambda,
description="This is the API Gateway endpoint that takes in a user prompt and retuns an agent response.",
cloud_watch_role=True,
proxy=False,
deploy=True,
endpoint_types=[apigw.EndpointType.EDGE],
deploy_options=apigw.StageOptions(
stage_name="question",
logging_level=apigw.MethodLoggingLevel.INFO,
access_log_destination=apigw.LogGroupLogDestination(apigw_access_loggroup),
data_trace_enabled=True,
caching_enabled=False,
metrics_enabled=True,
),
integration_options=apigw.LambdaIntegrationOptions(
timeout=Duration.seconds(29),
)
)

plan = apigw.UsagePlan(self, "BedrockUsagePlan",
name="BedrockUsagePlan",
description="This is the usage plan for the Bedrock API Gateway endpoint.",
quota=apigw.QuotaSettings(
limit=1000,
period=apigw.Period.DAY,
offset=0
),
throttle=apigw.ThrottleSettings(
rate_limit=100,
burst_limit=50
),
api_stages=[apigw.UsagePlanPerApiStage(
api=agent_apigw_endpoint,
stage=agent_apigw_endpoint.deployment_stage
)]
)

# Create an API key for the API Gateway endpoint. Requester must add the x-api-key header with the key to gain access.
bedrock_api_key = apigw.ApiKey(self, "BedrockAPIKey",
api_key_name="BedrockAPIKey",
enabled=True,
description="This is the API key for the Bedrock API Gateway endpoint.",
)
plan.add_api_key(bedrock_api_key)

# Create request validator for the API Gateway endpoint
request_model = agent_apigw_endpoint.add_model("BrRequestValidatorModel",
content_type="application/json",
model_name="BrRequestValidatorModel",
description="This is the request validator model for the Bedrock API Gateway endpoint.",
schema=apigw.JsonSchema(
schema=apigw.JsonSchemaVersion.DRAFT4,
title="postRequestValidatorModel",
type=apigw.JsonSchemaType.OBJECT,
required=["conversationId", "userQuestion"],
properties={
"conversationId": apigw.JsonSchema(type=apigw.JsonSchemaType.STRING, min_length=2, max_length=32),
"userQuestion": apigw.JsonSchema(type=apigw.JsonSchemaType.STRING, min_length=1, max_length=500),
}
)
)

# Add the POST method that references the request validator and sets the API key as required
agent_apigw_endpoint.root.add_method(
http_method='POST',
api_key_required=True,
request_validator_options=apigw.RequestValidatorOptions(
request_validator_name="PostRequestValidator",
validate_request_body=True,
validate_request_parameters=False,
),
request_models={"application/json": request_model},
)

# Adding documentation to the API Gateway endpoint
properties_json = '{"info":"This is the API Gateway endpoint that takes in a user prompt and retuns an agent response."}'

cfn_documentation_part = apigw.CfnDocumentationPart(self, "MyCfnDocumentationPart",
location=apigw.CfnDocumentationPart.LocationProperty(
type="API"
),
properties=properties_json,
rest_api_id=agent_apigw_endpoint.rest_api_id
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/ApiGwStack/agent_apigw_endpoint/Default',
[NagPackSuppression(id="AwsSolutions-APIG4", reason="This will need an authorizer."), NagPackSuppression(id="AwsSolutions-COG4", reason="We may or not use a Cognito user pool authorizer.")],
True
)

NagSuppressions.add_resource_suppressions_by_path(
self,
'/ApiGwStack/agent_apigw_endpoint/CloudWatchRole/Resource',
[NagPackSuppression(id="AwsSolutions-IAM4", reason="The service role permission for cloudwatch logs are handled by the Construct.")],
True
)

Notice the definition of the request validator. They support basic checks for query string parameters and headers but can also perform more complex request body validation against a defined data model.

Sidebar: REST API request validators process incoming API requests before proceeding with the integration request; in that way, the API Gateway handles requests that don’t meet the defined payload model schema without invoking the backend AWS Lambda function.

The following snippet illustrates the AWS Lambda function code for invoking the Agent and returning the response to the AWS API Gateway.

Notice how the function code expects to see specific key-value pairs in the incoming request body. API users should know the request requirements, what methods are available, what fields are required, and so on.

import boto3
from botocore.exceptions import ClientError
import os
import logging
import json

agentId = os.environ["BEDROCK_AGENT_ID"]
agentAliasIdString = os.environ["BEDROCK_AGENT_ALIAS"]
region = os.environ["AWS_REGION"]

agentAliasId = agentAliasIdString[-10:]

agent_response = ""


def handler(event, context):

# expected fields from the incoming request
body = json.loads(event['body'])
question = body['userPrompt']
sessionId = body["sessionId"]

print(f"Session: {sessionId} asked question: {question}")

try:
response = askQuestion(question, sessionId)
return {
"statusCode": 200,
"body": json.dumps(response),
"headers": {
"Content-Type": "application/json"
}
}

except Exception as e:
return {
"statusCode": 400,
"body": {"error": str(e)},
"headers": {
"Content-Type": "application/json"
}
}


def askQuestion(question, sessionId):

try:
client = boto3.client('bedrock-agent-runtime', region_name=region)
logging.info(f"Invoking agent with question: {question}")
response = client.invoke_agent(
agentId=agentId,
agentAliasId=agentAliasId,
sessionId=sessionId,
inputText=question,
)

completion = ""

for event in response.get("completion"):
chunk = event["chunk"]
completion = completion + chunk["bytes"].decode()

except ClientError as e:
logging.error(f"Couldn't invoke agent. {e}")
raise

print(completion)

return completion

Now, all the pieces are in place to ask questions about the documents in the Knowledge base … and issue a POST request to the API endpoint.

curl -X POST -H "Content-Type: application/json" -H "x-api-key: $API_KEY" -d '{"userPrompt":"$MY_QUESTION", "sessionId":"$MY_SESSION_ID"}' $MY_API_GW_INVOCATION_URL

Conclusions

Knowledge bases for Amazon Bedrock is a managed service that implements an orchestrated data ingestion and querying workflow for RAG applications. Adopting Agents for Amazon Bedrock with Knowledge bases can provide a highly abstracted surface on which cloud developers can rapidly create scalable and secure RAG architectures.

Avoiding undifferentiated heavy lifting during explorations and prototyping can help attain accelerated results; application builders can iterate quickly with Knowledge bases for Amazon Bedrock by leveraging AWS CDK construct patterns and calibrating the architecture and available configuration options.

Amazon Bedrock and its supporting services are noticeably “young”. Building with AWS CDK and AWS CloudFormation on Amazon Bedrock highlights current “IaC” gaps and provides opportunities for the Amazon Bedrock service team to improve the developer experience further. AWS Custom Resources can help with practical workarounds, but increasing AWS CloudFormation coverage for Amazon Bedrock would accelerate the developer community.

Adopting a RestAPI pattern and providing a standard endpoint to users and application builders can help create clear demarcation points and avoid exposing unnecessary implementation details.

--

--

Dirk Michel

SVP SaaS and Digital Technology | AWS Ambassador. Talks Cloud Engineering, Platform Engineering, Release Engineering, and Reliability Engineering.