On AWS CDK and Agents for Amazon Bedrock

Dirk Michel
26 min readJun 3, 2024

--

Many organisations plan or already have active Generative AI (GenAI) exploration streams that aim to discover and unlock valuable new use cases. A promising approach to accelerated GenAI prototyping and development is to leverage managed services that host and provide API-level access to large language models (LLMs) and other foundation models (FMs). Adopting development frameworks and software development kits (SDKs) that integrate with the GenAI ecosystem is an equally important ingredient that can accelerate iterations and help explore the problem space.

Enter Amazon Bedrock …, a fully managed service that exposes FMs from leading AI model providers through a unified API that removes the need for provisioning, scaling and securing servers, deploying models, and configuring inference endpoints.

Developers can reduce “undifferentiated heavy lifting” with Amazon Bedrock and accelerate their explorations towards building GenAI applications with security, privacy, responsible AI principles, and leverage the organisation’s data. The AWS Cloud provides supplementary services and primitives for building and accessing supporting data foundations, such as distributed data processing engines, pipelines and data stores.

The AWS Cloud Development Kit (AWS CDK) — an open-source software development framework for defining cloud application resources as code — can further accelerate prototyping with AWS Bedrock and other supporting AWS Cloud services.

For those on a tight time budget: The TL;DR of the following sections is to show an example of how Agents for Amazon Bedrock and the AWS CDK can be used to rapidly prototype a helpful natural language companion that can answer questions by executing actions and accessing private tabular data sets. The application is implemented as a containerised Streamlit application that runs on Amazon ECS and invokes Agents for Amazon Bedrock, equipped with function calls towards Amazon Athena. The incoming data is regularly refreshed and updated with AWS Glue, which ingests, cleanses, processes, catalogues, and delivers the data into a data lake based on Amazon S3.

This is illustrated in the reference diagram below.

High-level components and services of the Text2SQL Agentic flow with Amazon Bedrock.

Let’s do it.

The AWS CDK snippets provide working code to illustrate the creation of the various application layers from the ground up. AWS CDK lets us use supported programming languages to write compact code that generates AWS CloudFormation. You can install Anaconda as your Python virtual environment, Streamlit, a recent version of the AWS CDK construct library that includes support for Amazon Bedrock, and Docker or your favourite container management tools to follow along.

Let’s begin by describing the AWS CDK project layout and how the application is organised. AWS CDK Python projects start with a top-level app.py module referencing multiple Stacks: The Stacks define the AWS resources that can be described using a range of constructs from the AWS CDK Construct Library. These constructs are one of the principle value propositions of AWS CDK, and they are categorised into L1, L2, and L3 construct levels. Our AWS CDK project draws from all construct levels.

AWS CDK initialises the app.py module when executing cdk deploy from the command line. The following snippet shows the module and how the Stacks are organised and sequenced.

#!/usr/bin/env python3
from cdk_nag import AwsSolutionsChecks

import aws_cdk as cdk
from stacks.kb_stack import DataFoundationStack
from stacks.lambda_stack import LambdaStack
from stacks.bedrock_stack import BedrockStack
from stacks.streamlit_stack import StreamlitStack

app = cdk.App()

dict1 = {
"region": os.environ["AWS_REGION"],
"account_id": os.environ["AWS_ACCOUNT_ID"]
}

stack1 = DataFoundationStack(app, "DataStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="Data lake and processing resources",
termination_protection=False,
tags={"project":"genai-prototype"},
)

stack2 = LambdaStack(app, "LambdaStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="Lambda resources for the bedrock action groups",
termination_protection=False,
tags={"project":"genai-prototype"},
dict1=dict1,
)

stack3 = BedrockStack(app, "BedrockAgentStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="Bedrock agent resources",
termination_protection=False,
tags={"project":"genai-prototype"},
dict1=dict1,
athena_lambda_arn=stack2.athena_lambda_arn,
search_lambda_arn=stack2.search_lambda_arn
)

stack4 = StreamlitStack(app, "StreamlitStack",
env=cdk.Environment(account=dict1['account_id'], region=dict1['region']),
description="Streamlit app resources",
termination_protection=False,
tags={"project":"genai-prototype"},
dict1=dict1
)

stack2.add_dependency(stack1)
stack3.add_dependency(stack2)
stack4.add_dependency(stack3)

cdk.Tags.of(stack1).add(key="owner",value="ccoe")
cdk.Tags.of(stack2).add(key="owner",value="ccoe")
cdk.Tags.of(stack3).add(key="owner",value="ccoe")
cdk.Tags.of(stack4).add(key="owner",value="ccoe")

cdk.Aspects.of(app).add(AwsSolutionsChecks(verbose=False))
app.synth()

Notice how the code snippet's numbered Stack references include a dictionary, a convenient method for passing variables into Stacks. We can also pass resource objects between Stacks: The AWS Lambda functions are defined in stack2, and their AWS Lambda references are passed into stack3. AWS CloudFormation Outputs can also effectively “export” and “import” objects directly between Stacks, as illustrated in stack2. Adding dependencies between Stacks ensures an ordered execution within AWS CloudFormation.

Another generally helpful approach with AWS CDK projects is to adopt cdk-nag, inspired by cfn_nag, which contains rule sets, aka NagPacks, that can detect and help remediate AWS CDK code projects based on best practices. The rules encode best practices and help identify patterns in AWS CDK projects that may indicate insecure configuration, for example, IAM rules and security groups that are too permissive (wildcards), access logs that aren’t enabled, encryption that isn’t enabled, and password literals. Applying rules is configurable, and we can, for example, suppress the rules that are not required in a given evaluation context. Stacks can make use of the cdk-nag library and running cdk synth --all generates a list of all the findings.

AWS CDK context variables are also helpful for further structuring and activating code blocks based on user inputs. Some resources, for example, may only be needed for rapid development and could be optionally enabled through cdk deploy --context variables. Stack4 uses this technique to activate optional resources such as AWS Certificate Manager (ACM) TLS certificates and Amazon Elastic File System (EFS) shared storage.

The following sections illustrate the four Stacks of the AWS CDK application.

1. Building out the Data Foundation

Data foundations are often critical to building useful GenAI applications, as we can provide additional context derived from private or enterprise data sets that “generic” pre-trained LLMs would not be aware of. It is precisely these data foundations that help build differentiated experiences.

Unlike model fine-tuning or training custom LLMs from scratch, data foundations can provide additional situational and semantic context to agentic workflows. For example, situational context can be added by accessing conversational histories, data from purpose-built data stores and APIs. Semantic context is added from vector data stores and similarity search. This is how the term context engineering is coined. Our AWS CDK project defines a data architecture based on Amazon S3, AWS Glue, and Amazon Athena.

At the beginning of the DataFoundationStack, we define a unique hash string useful when defining global resource names such as Amazon S3 buckets. This suffix can then be appended to generate unique resource names. For example, attempting to provision an Amazon S3 bucket with an existing bucket name would fail. The Amazon S3 bucket can also be “seeded” with your initial private tabular data set through the aws_s3_deployment construct. This Amazon S3 bucket data-set location can also continuously receive new incoming data from a data delivery source, which we will not depend on here.

The second code block defines the AWS Glue resources, including the AWS Glue Data Catalogue database, a processing Job, and a Job schedule. These resources are all declared with L1 constructs generated from the AWS CloudFormation resource specification. In the future, L2 constructs for AWS Glue resources that simplify their parameterisation may become available.

The AWS Glue L1 constructs, however, do give us a lot of control as we can access all parameters. We can use the AWS Glue “FLEX” Job execution_class to help with cost management for prototypes, pre-production, and non-urgent data integration workloads. Notice how we inject environment variables into the CfnJob AWS Glue Job definition, which can be a valuable method to parameterise the Job’s PySpark script: These variables can be accessed through the os python module and the os.environ statement.

Sidebar: The environment variables for AWS Glue Jobs need to follow a particular naming convention. They must be prefixed with “CUSTOMER_”; otherwise, the PySpark Job will not find them.

The PySpark script, which is uploaded as a separate asset through the CfnJob command script_location parameter, contains the details for processing and updating the data catalogue resources every time it runs and detects new data: Enabling the “--job-bookmark-option” makes sure that already processed data is not processed again.

from aws_cdk import (
Duration,
RemovalPolicy,
Stack,
CfnOutput,
aws_iam as iam,
aws_s3 as s3,
aws_s3_deployment as s3d,
aws_glue as glue,
aws_athena as athena
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct
import hashlib

class DataFoundationStack(Stack):

def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

# Create a unique string to create unique resource names
hash_base_string = (self.account + self.region)
hash_base_string = hash_base_string.encode("utf8")

### 1. Create data-set resources

# Create S3 bucket for the data set
data_bucket = s3.Bucket(self, "DataLake",
bucket_name=("data-bucket-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
auto_delete_objects=True,
versioned=True,
removal_policy=RemovalPolicy.DESTROY,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
enforce_ssl=True,
encryption=s3.BucketEncryption.S3_MANAGED,
intelligent_tiering_configurations=[
s3.IntelligentTieringConfiguration(
name="my_s3_tiering",
archive_access_tier_time=Duration.days(90),
deep_archive_access_tier_time=Duration.days(180),
prefix="prefix",
tags=[s3.Tag(
key="key",
value="value"
)]
)],
lifecycle_rules=[
s3.LifecycleRule(
noncurrent_version_expiration=Duration.days(7)
)
],
)

# Create S3 bucket policy for bedrock permissions
add_s3_policy = data_bucket.add_to_resource_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:GetObject","s3:PutObject","s3:AbortMultipartUpload"],
resources=[data_bucket.arn_for_objects("*")],
principals=[iam.ServicePrincipal("bedrock.amazonaws.com")],
)
)

NagSuppressions.add_resource_suppressions(
data_bucket,
[NagPackSuppression(id="AwsSolutions-S1", reason="The bucket is not for production and should not require debug.")],
True
)

# Upload a sample data-sest from asset to S3 bucket - with the prefix for incoming "raw" data
s3d.BucketDeployment(self, "DataDeployment",
sources=[s3d.Source.asset("assets/data-set/")],
destination_bucket=data_bucket,
destination_key_prefix="data-set/"
)

# Export the data set bucket name
CfnOutput(self, "DataSetBucketName",
value=data_bucket.bucket_name,
export_name="DataSetBucketName"
)

### 2. Create glue resources

# Create Glue Service Role
glue_service_role = iam.Role(self, "GlueServiceRole",
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSGlueServiceRole")
]
)

# Create Glue Database
glue_database_name = "data_set_db"

glue.CfnDatabase(self, "DataSetDatabase", catalog_id=self.account, database_input=glue.CfnDatabase.DatabaseInputProperty(name=glue_database_name))

# Create Glue role for the etl job
glue_job_role = iam.Role(self, "GlueEtlJobRole",
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSGlueServiceRole"),
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess")
]
)

# Grant read and write access to the data bucket
data_bucket.grant_read_write(glue_job_role)

# Upload glue etl script from asset to S3 bucket. The script will be used by the Glue etl job, creates compressed parquet files, creates a schema, and creates a glue db table partitions
s3d.BucketDeployment(self, "GlueJobScript",
sources=[s3d.Source.asset("assets/glue/")],
destination_bucket=data_bucket,
destination_key_prefix="scripts/"
)

# Create a Glue etl job that processes the data set
etl_job = glue.CfnJob(self, "DataSetETLJob",
role=glue_job_role.role_arn,
execution_class="FLEX",
command=glue.CfnJob.JobCommandProperty(
name="glueetl",
script_location="s3://{}/scripts/etl.py".format(data_bucket.bucket_name),
python_version="3",
),
default_arguments={
"--job-bookmark-option": "job-bookmark-enable",
"--enable-metrics": "true",
"--enable-observability-metrics": "true",
"--enable-continuous-cloudwatch-log": "true",
"--customer-driver-env-vars": f"CUSTOMER_BUCKET_NAME={data_bucket.bucket_name}",
"--customer-executor-env-vars": f"CUSTOMER_BUCKET_NAME={data_bucket.bucket_name}"
},
glue_version="4.0",
max_retries=0,
number_of_workers=2,
worker_type="G.1X"
)

# Create a Glue schedule for the etl job that processes the data set with the bookmark option enabled
glue_schedule = glue.CfnTrigger(self, "DataSetETLSchedule",
name="DataSetETLSchedule",
description="Schedule for the DataSetETLJob to discover and process incoming data",
type="SCHEDULED",
start_on_creation=True,
actions=[glue.CfnTrigger.ActionProperty(
job_name=etl_job.ref,
arguments={
"--job-bookmark-option": "job-bookmark-enable",
"--enable-metrics": "true",
"--enable-observability-metrics": "true",
"--enable-continuous-cloudwatch-log": "true",
"--customer-driver-env-vars": f"CUSTOMER_BUCKET_NAME={data_bucket.bucket_name}",
"--customer-executor-env-vars": f"CUSTOMER_BUCKET_NAME={data_bucket.bucket_name}"
}
)],
schedule="cron(0 1 * * ? *)" # Run once a day at 1am
)

### 3. Create Athena resources

# Create S3 athena destination bucket
athena_bucket = s3.Bucket(self, "AthenaDestination",
bucket_name=("athena-destination-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
auto_delete_objects=True,
versioned=True,
removal_policy=RemovalPolicy.DESTROY,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
enforce_ssl=True,
encryption=s3.BucketEncryption.S3_MANAGED,
intelligent_tiering_configurations=[
s3.IntelligentTieringConfiguration(
name="my_s3_tiering",
archive_access_tier_time=Duration.days(90),
deep_archive_access_tier_time=Duration.days(180),
prefix="prefix",
tags=[s3.Tag(
key="key",
value="value"
)]
)],
lifecycle_rules=[
s3.LifecycleRule(
noncurrent_version_expiration=Duration.days(7)
)
],
)

athena_bucket.grant_read_write(iam.ServicePrincipal("athena.amazonaws.com"))

# Export the athena destination bucket name
CfnOutput(self, "AthenaDestBucketName",
value=athena_bucket.bucket_name,
export_name="AthenaDestinationBucketName"
)

# Set the query result location for Athena
athena_bucket_uri = f"s3://{athena_bucket.bucket_name}/query-results/"

athena_bucket.add_to_resource_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:GetBucketLocation"],
resources=[athena_bucket.bucket_arn],
principals=[iam.ServicePrincipal("athena.amazonaws.com")],
)
)
athena_bucket.add_to_resource_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:ListBucket"],
resources=[athena_bucket.bucket_arn],
principals=[iam.ServicePrincipal("athena.amazonaws.com")],
conditions={"StringEquals": {"s3:prefix": ["query-results/"]}},
)
)
athena_bucket.add_to_resource_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:PutObject", "s3:GetObject"],
resources=[f"{athena_bucket.bucket_arn}/query-results/*"],
principals=[iam.ServicePrincipal("athena.amazonaws.com")],
)
)
athena_bucket.add_to_resource_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:PutObject"],
resources=[f"{athena_bucket.bucket_arn}/query-results/*"],
principals=[iam.ServicePrincipal("athena.amazonaws.com")],
conditions={"StringEquals": {"s3:x-amz-acl": "bucket-owner-full-control"}},
)
)

NagSuppressions.add_resource_suppressions(
athena_bucket,
[NagPackSuppression(id="AwsSolutions-S1", reason="AwsSolutions-S1: The bucket is not for production and should not require access logs.")],
True
)

# Configure Athena Query Editor and set the athena destination bucket
athena_workgroup = athena.CfnWorkGroup(self, "AthenaWorkGroup",
name="bedrock-workgroup",
recursive_delete_option=True,
work_group_configuration=athena.CfnWorkGroup.WorkGroupConfigurationProperty(
enforce_work_group_configuration=True,
result_configuration=athena.CfnWorkGroup.ResultConfigurationProperty(
output_location=athena_bucket_uri,
encryption_configuration=athena.CfnWorkGroup.EncryptionConfigurationProperty(
encryption_option="SSE_S3"
)
)
)
)

# Export the athena workgroup name
CfnOutput(self, "AthenaWorkGroupName",
value=athena_workgroup.name,
export_name="AthenaWorkGroupName"
)

The complexity of the PySpark script depends on the amount of processing required for the incoming data. AWS Glue Studio and its visual ETL editor can be an excellent low-code entry point that provides many managed transforms. Alternatively, AWS Glue Studio Notebooks can help interactively author PySpark scripts in a notebook interface based on Jupyter Notebooks: The following snippet shows a coded example that takes incoming tabular CSV files, applies some data cleaning, delivers the output as partitioned and compressed Apache Parquet files to Amazon S3, and updates the AWS Glue Catalogue.

import sys
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Initiate the spark session context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Assign the bucket name
bucket_name = os.environ['CUSTOMER_BUCKET_NAME']

# Load the data set from Amazon S3 into a dynamic data frame
load_data = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": "\"", "withHeader": True, "separator": ","}, connection_type="s3", format="csv", connection_options={"paths": [f"s3://{bucket_name}"]}, transformation_ctx="load_data")

# Convert the dynamic frame to a data frame
df = load_data.toDF()

# Create a "user defined function" and define a python lambda for data cleaning. Example: Removing unwanted characters such as '%' and ' ' white spaces
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
chop_f = udf(lambda x: x.replace('%', '').replace(' ', ''), StringType())
clean_df = df.withColumn("COLUMN 1", chop_f(df["COLUMN 1"])).withColumn("COLUMN 2", chop_f(df["COLUMN 2"]))

# Convert the data frame back to a dynamic frame
from awsglue.dynamicframe import DynamicFrame
df_tmp = DynamicFrame.fromDF(clean_df, glueContext)

# Now we can update the schema of the dynamic frame. Example: Change column names and typecast columns
schema_update = ApplyMapping.apply(frame=df_tmp, mappings=[("COLUMN 1", "string", "COLUMN_1", "string"), ("COLUMN 2", "string", "COLUMN_2", "float")], transformation_ctx="schema_update")

# Write the data back to Amazon S3 in parquet format
write_data = glueContext.getSink(path=f"s3://{bucket_name}/data-proc/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["COLUMN_1"], enableUpdateCatalog=True, transformation_ctx="write_data")
write_data.setCatalogInfo(catalogDatabase="data_set_db",catalogTableName="data_proc")
write_data.setFormat("glueparquet", compression="snappy")
write_data.writeFrame(schema_update)
job.commit()

Notice how script accesses the CUSTOMER_BUCKET_NAME environment variable we injected with the DataFoundationStack. This repository contains additional code examples and illustrates more complex use cases for PySpark.

The third block defines the Amazon Athena query result location and the Athena Workgroup. Both are important as they implement various options, including workload management for incoming queries and query result reuse. Interestingly, the query reuse parameters are submitted with each query and are not part of the workgroup settings. For example, the AWS SDK for Python athena_client.start_query_execution API call contains the ResultReuseByAgeConfiguration and the MaxAgeInMinutes parameter.

2. Preparing the AWS Lambda Functions for Agent Action Groups

This AWS CDK Stack defines the AWS Lambda Functions that Agents for Amazon Bedrock will reference to access specific “tool” capabilities. These Agent “tools” are configured as Agent action groups. The prototype we are building here utilises two action groups, one for accessing structured tabular datasets through Athena and one for performing web searches.

Each action group has two sides: On the one hand, we have the Agent, which uses a definable OpenAPI schema to post a message to the referenced AWS Lambda Function. The other side is the receiving AWS Lambda Function, which parses the incoming request object paths to correctly access the message body and payload. In other words, when an Agent action group invokes a Lambda function, Amazon Bedrock emits a Lambda input event: The receiving Lambda Function can use any input event field when implementing the business logic. This repository shows a sample AWS Lambda Function that implements the Athena use case.

The first block of the LambdaStack code snippet below defines the AWS Lambda Function for Amazon Athena, and the second block does the same for the web search function. Each _lambda.Function definition has a range of required parameters, including references to the function code path location and the program file name. Using lambda.Code.from_asset is a convenient way to reference the relative path or folder/directory location (in our case, the directory is called lambda), and the file name of the Python script is passed in with the handler parameter in the format of ‘<file_name.hander>’.

Notice that we use the AWS CDK Fn class to import the AthenaDestinationBucketName that we exported via CfnOutput in the preceding DataFoundationStack where the bucket was created. The bucket name is then defined as an AWS Lambda Function environment variable, which the function code can access and use as part of the AWS SDK call to the Amazon Athena API. This approach avoids hardcoding bucket names and uses variables to make the AWS CDK App portable and reusable.

from aws_cdk import (
Duration,
Stack,
CfnOutput,
aws_lambda as _lambda,
aws_iam as iam,
Fn as Fn,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct

class LambdaStack(Stack):

def __init__(self, scope: Construct, id: str, dict1, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

### 1. Define a Lambda function for the agent to interact with Athena

# Defines an AWS Lambda function
athena_lambda = _lambda.Function(
self, 'athena-lambda-action',
runtime=_lambda.Runtime.PYTHON_3_12,
code=_lambda.Code.from_asset('lambda'),
handler='lambda_athena.handler',
timeout=Duration.seconds(60),
memory_size=4048,
)

# Export the lambda arn
CfnOutput(self, "LambdaAthenaForBedrockAgent",
value=athena_lambda.function_arn,
export_name="LambdaAthenaForBedrockAgent"
)

self.athena_lambda_arn = athena_lambda.function_arn

# Adding Lambda execution role permissions for the services lambda will interact with.
add_execution_policy = athena_lambda.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:*", "athena:*", "s3:*"],
resources=["*"],
)
)

# Adding iam managed policies to the Lambda execution role
athena_lambda.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonS3FullAccess')
)
athena_lambda.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonAthenaFullAccess')
)
athena_lambda.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name('AWSGlueConsoleFullAccess')
)

# Add permissions to the Lambda function resource policy. You use a resource-based policy to allow an AWS service to invoke your function.
add_lambda_resource_policy = athena_lambda.add_permission(
"AllowBedrock",
principal=iam.ServicePrincipal("bedrock.amazonaws.com"),
action="lambda:InvokeFunction",
source_arn=f"arn:aws:bedrock:{dict1['region']}:{dict1['account_id']}:agent/*"
)

# Create athena environment variables for the athena lambda function
athena_dest_bucket = Fn.import_value("AthenaDestinationBucketName")
athena_lambda.add_environment("ATHENA_DEST_BUCKET", athena_dest_bucket)

athena_workgroup = Fn.import_value("AthenaWorkGroupName")
athena_lambda.add_environment("ATHENA_WORKGROUP", athena_workgroup)

### 2. Define a Lambda function for the agent to search the web

# Create a Lambda layer
layer = _lambda.LayerVersion(
self, 'py-lib-layer',
code=_lambda.Code.from_asset('assets/lambda_layer_with_py_deps.zip'),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_12],
)

# Defines an AWS Lambda function
search_lambda = _lambda.Function(
self, 'search-lambda-action',
runtime=_lambda.Runtime.PYTHON_3_12,
code=_lambda.Code.from_asset("lambda"),
handler='lambda_search.handler',
timeout=Duration.seconds(60),
memory_size=4048,
)

# Add the layer to the search lambda function
search_lambda.add_layers(layer)

# Export the lambda arn
CfnOutput(self, "LambdaSearchForBedrockAgent",
value=search_lambda.function_arn,
export_name="LambdaSearchForBedrockAgent"
)

self.search_lambda_arn = search_lambda.function_arn

# Adding Lambda execution role permissions for the services lambda will interact with.
add_execution_policy = search_lambda.add_to_role_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:*", "athena:*", "s3:*"],
resources=["*"],
)
)

# Adding iam managed policies to the Lambda execution role
search_lambda.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonS3FullAccess')
)
search_lambda.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonAthenaFullAccess')
)
search_lambda.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name('AWSGlueConsoleFullAccess')
)

# Add permissions to the Lambda function resource policy. You use a resource-based policy to allow an AWS service to invoke your function.
add_lambda_resource_policy = search_lambda.add_permission(
"AllowBedrock",
principal=iam.ServicePrincipal("bedrock.amazonaws.com"),
action="lambda:InvokeFunction",
source_arn=f"arn:aws:bedrock:{dict1['region']}:{dict1['account_id']}:agent/*"
)

The snippet also shows how the web search AWS Lambda Function uses a Lambda Layer, a .zip archive file usually containing library dependencies. In our case, the layer contains Python module dependencies for the web search function code. Lambda layers are an excellent method for sharing dependencies across multiple functions.

3. The Agents for Amazon Bedrock

Now, we are ready to define the Agent for Amazon Bedrock. Agents orchestrate interactions between FMs, enterprise data sources, software applications, and user conversations. Agents can access supplementary “tools” or “function calls” to take specific actions to answer the user prompt, such as calling Amazon Athena, for example, to obtain results from a data set stored on Amazon S3.

To define an Agent for Amazon Bedrock, we need various ingredients, many of which add segments to the context-engineered system prompts that we use to interact with a generic pre-trained LLM effectively:

Agent Instructions: Contain a description that tells the agent what to do and how to interact with users. This instruction text is then substituted into the orchestration prompt template with the $instructions$ variable.
Pre-Processing Template: Describe how the user prompts should be pre-processed, classified and filtered.
Orchestration Template: This template contains the variables for Agent Instructions and available function tool names. It also includes advanced prompts that enhance agent accuracy, such as labelled examples and few-shot prompts, improving model performance for specific tasks.

For “agentic workflows” and “tool function calling”, we must also define Agent action groups, each defined by its OpenAPI schema and AWS Lambda Function reference. The instructions, prompt templates, and OpenAPI schema files can all be conveniently described as part of the L1 constructs for Agents. The first code block below reads in the various text files and assigns them to variables used for the CfnAgent definition.

You'll need a degree of context for some of the L1 CfnAgent construct parameters. For example, the foundation_model parameter takes the model ID as a value, not the model name. Using the AWS CLI to list the foundation models (aws bedrock list-foundation-models | grep -i haiku) would provide the valid model ID. Another example would be the agent execution IAM role name, which is pattern-matched as defined by the AWS CloudFormation properties and must be formatted in a particular way.

Similarly, notice how the prompt_override_configuration contains an array of two PromptConfigurationProperty definitions, one for pre-processing template properties and another for orchestration template properties. These override files are LLM-specific, and Amazon Bedrock provides default templates and stop sequences that can be used as a starting point for additional advanced override configurations, depending on which model is selected.

Sidebar: Advanced orchestration template overrides can be very effective for the Athena query use case. Providing specific instructions and labelled examples for valid Athena SQL queries can help reduce mistakes and impacts of corner cases. For example, the LLM may use SQL functions as part of a query statement that may be valid for e.g. SQL Server but not for Amazon Athena.

Anthropic provides extensive model documentation that helps define customisations and provides model-specific advanced guidance on topics such as reducing latency and prompt engineering.

The CfnAgent definition also contains the action groups, which take an array of up to 20 action groups and AgentActionGroupProperty definitions, each requiring the AWS Lambda Function ARN and OpenAPI schema definition. The schema JSON files can be passed in with the payload parameter as shown in the code snippet, or alternatively, an Amazon S3 URI can be used that contains the schema files.

from aws_cdk import (
Duration,
Stack,
CfnOutput,
RemovalPolicy,
aws_iam as iam,
aws_bedrock as bedrock,
aws_s3_deployment as s3d,
aws_s3 as s3,
Fn as Fn,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct

class BedrockStack(Stack):

def __init__(self, scope: Construct, id: str, dict1, athena_lambda_arn, search_lambda_arn, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

### 1. Creating the agent for bedrock

# Create a bedrock agent execution role with permissions to interact with the services
bedrock_agent_role = iam.Role(self, 'bedrock-agent-role',
role_name='AmazonBedrockExecutionRoleForAgents_KIUEYHSVDR',
assumed_by=iam.ServicePrincipal('bedrock.amazonaws.com'),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonBedrockFullAccess'),
iam.ManagedPolicy.from_aws_managed_policy_name('AWSLambda_FullAccess'),
iam.ManagedPolicy.from_aws_managed_policy_name('AmazonS3FullAccess'),
iam.ManagedPolicy.from_aws_managed_policy_name('CloudWatchLogsFullAccess'),
],
)

CfnOutput(self, "BedrockAgentRoleArn",
value=bedrock_agent_role.role_arn,
export_name="BedrockAgentRoleArn"
)

# Add iam resource to the bedrock agent
bedrock_agent_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:InvokeModel", "bedrock:InvokeModelEndpoint", "bedrock:InvokeModelEndpointAsync"],
resources=["*"],
)
)

# Add instructions for the bedrock agent
with open('assets/agent_instructions.txt', 'r') as file:
agent_instruction = file.read()

# Add schema for the bedrock agent
with open('assets/schema/athena_ag_schema.json', 'r') as file:
athena_schema_def = file.read()

with open('assets/schema/search_ag_schema.json', 'r') as file:
search_schema_def = file.read()

# Define advanced prompt - orchestation template - override orchestration template defaults
with open('assets/agent_orchenstation_template.json', 'r') as file:
orc_temp_def = file.read()

# Define advanced prompt - pre-processing template - override pre-processing template defaults
with open('assets/agent_preprocessing_template.json', 'r') as file:
pre_temp_def = file.read()

# Create a bedrock agent
bedrock_agent = bedrock.CfnAgent(self, 'bedrock-agent',
agent_name='saas-acs-bedrock-agent',
description="This is a bedrock agent that can be invoked by calling the bedrock agent alias and agent id.",
auto_prepare=True,
foundation_model="anthropic.claude-3-haiku-20240307-v1:0",
instruction=agent_instruction,
agent_resource_role_arn=str(bedrock_agent_role.role_arn),
prompt_override_configuration=bedrock.CfnAgent.PromptOverrideConfigurationProperty(
prompt_configurations=[
bedrock.CfnAgent.PromptConfigurationProperty(
base_prompt_template=orc_temp_def,
prompt_type="ORCHESTRATION",
prompt_state="ENABLED",
prompt_creation_mode="OVERRIDDEN",
inference_configuration=bedrock.CfnAgent.InferenceConfigurationProperty(
maximum_length=2048,
stop_sequences=["</error>","</answer>","</invoke>"],
temperature=0,
top_k=250,
top_p=1,
)
),
bedrock.CfnAgent.PromptConfigurationProperty(
base_prompt_template=pre_temp_def,
prompt_type="PRE_PROCESSING",
prompt_state="ENABLED",
prompt_creation_mode="OVERRIDDEN",
inference_configuration=bedrock.CfnAgent.InferenceConfigurationProperty(
maximum_length=2048,
stop_sequences=["⏎⏎Human:"],
temperature=0,
top_k=250,
top_p=1,
)
)
]),
action_groups=[
bedrock.CfnAgent.AgentActionGroupProperty(
action_group_name="AthenaToolFunction",
description="A Function Tool that can access a network metrics dataset.",
action_group_executor=bedrock.CfnAgent.ActionGroupExecutorProperty(
lambda_=athena_lambda_arn,
),
api_schema=bedrock.CfnAgent.APISchemaProperty(
payload=athena_schema_def
),
),
bedrock.CfnAgent.AgentActionGroupProperty(
action_group_name="WebsearchToolFunction",
description="A Function Tool that can search the web.",
action_group_executor=bedrock.CfnAgent.ActionGroupExecutorProperty(
lambda_=search_lambda_arn,
),
api_schema=bedrock.CfnAgent.APISchemaProperty(
payload=search_schema_def,
),
)
],
)

CfnOutput(self, "BedrockAgentID",
value=bedrock_agent.ref,
export_name="BedrockAgentID"
)

CfnOutput(self, "BedrockAgentModelName",
value=bedrock_agent.foundation_model,
export_name="BedrockAgentModelName"
)

### 2. Create an alias for the bedrock agent

# Create an alias for the bedrock agent
cfn_agent_alias = bedrock.CfnAgentAlias(self, "MyCfnAgentAlias",
agent_alias_name="bedrock-agent-alias",
agent_id=bedrock_agent.ref,
description="bedrock agent alias to simplify agent invocation",
tags={
"owner": "saas"
}
)
cfn_agent_alias.add_dependency(bedrock_agent)

agent_alias_string = cfn_agent_alias.ref
agent_alias = agent_alias_string.split("|")[-1]

CfnOutput(self, "BedrockAgentAlias",
value=agent_alias,
export_name="BedrockAgentAlias"
)

### 3. Setting up model invocation logging for Amazon Bedrock

# Create a S3 bucket for model invocation logs
model_invocation_bucket = s3.Bucket(self, "model-invocation-bucket",
bucket_name=("model-invocation-bucket-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
auto_delete_objects=True,
versioned=True,
removal_policy=RemovalPolicy.DESTROY,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
enforce_ssl=True,
encryption=s3.BucketEncryption.S3_MANAGED,
lifecycle_rules=[
s3.LifecycleRule(
noncurrent_version_expiration=Duration.days(14)
)
],
)

# Create S3 bucket policy for bedrock permissions
add_s3_policy = model_invocation_bucket.add_to_resource_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["s3:PutObject"],
resources=[model_invocation_bucket.arn_for_objects("*")],
principals=[iam.ServicePrincipal("bedrock.amazonaws.com")],
)
)

NagSuppressions.add_resource_suppressions(
model_invocation_bucket,
[NagPackSuppression(id="AwsSolutions-S1", reason="The bucket is not for production and should not require debug.")],
True
)

# Create a Cloudwatch log group for model invocation logs
model_log_group = logs.LogGroup(self, "model-log-group",
log_group_name=("model-log-group-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
log_group_class=logs.LogGroupClass.STANDARD,
retention=logs.RetentionDays.ONE_MONTH,
removal_policy=RemovalPolicy.DESTROY
)

### Custom resource to enable model invocation logging, as cloudformation does not support this feature at this time

# Define the request body for the api call that the custom resource will use
modelLoggingParams = {
"loggingConfig": {
"cloudWatchConfig": {
"largeDataDeliveryS3Config": {
"bucketName": model_invocation_bucket.bucket_name,
"keyPrefix": "invocation-logs"
},
"logGroupName": model_log_group.log_group_name,
"roleArn": bedrock_agent_role.role_arn
},
"embeddingDataDeliveryEnabled": False,
"imageDataDeliveryEnabled": False,
"textDataDeliveryEnabled": True
}
}

# Define a custom resource to make an AwsSdk startCrawler call to the Glue API
model_logging_cr = cr.AwsCustomResource(self, "ModelLoggingCustomResource",
on_create=cr.AwsSdkCall(
service="Bedrock",
action="putModelInvocationLoggingConfiguration",
parameters=modelLoggingParams,
physical_resource_id=cr.PhysicalResourceId.of("Parameter.ARN")
),
policy=cr.AwsCustomResourcePolicy.from_sdk_calls(
resources=cr.AwsCustomResourcePolicy.ANY_RESOURCE
)
)

# Define IAM permission policy for the custom resource
model_logging_cr.grant_principal.add_to_principal_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:*", "iam:CreateServiceLinkedRole", "iam:PassRole"],
resources=["*"],
)
)

The Agent Alias is defined in the second code block. In fact, Agent Aliases and Agent Versions are vital concepts to understand. To deploy an Agent and to make it accessible, we must create an alias for it. The Agent alias and the Agent ID are the key identifiers needed to invoke an Agent at runtime.

Sidebar: Creating an Agent alias simultaneously and automatically creates an Agent version, and the newly created alias points to its new version. An Agent version is akin to a snapshot, which preserves the Agent configuration details as they existed when its version was created. Aliases can be updated to point to older versions, which makes sense, as the alias is mapped into a “backing version” when invoked. Interestingly, Agent versions cannot be created independently; they are only created when an alias is created, which is not apparent when you start working with Agents for Amazon Bedrock.

Therefore, the CfnAgentAlias routing_configuration parameter is not used in the code snippet, as the initial creation of the Agent alias automatically creates the Agent “Version 1” and maps it to the alias.

Sidebar: Exposing an unchanged/constant Agent Alias can be useful when “shielding” applications from Agent version updates: Applications continue to invoke the same AliasID/AgentID “endpoint”. Pruning or deleting older Agent versions can be helpful to avoid “clutter” that can build up over time as new versions are published. Allow for a cool-down time period before deleting versions as active end-user conversations/sessions (based on the sessionId) would hold on to older agent versions and may result in “resourceNotFoundException” invocation errors.

Finally, the third code block enables model invocation logging, which can be an invaluable data source when prototyping and evaluating model performance and behaviour of the Agent. The model invocation logs contain the complete request data, response data, and metadata associated with each call.

At the time of writing, AWS CloudFormation does not support enabling model invocation logging; by extension, the AWS CDK does not support it either. In these situations, we can revert to AWS CDK custom resources that implement a method for calling AWS APIs directly. Model invocation logging can be enabled by directly calling the Amazon Bedrock API with the PutModelInvocationLoggingConfiguration action. The snippet shows how to enable model invocation log delivery into Amazon CloudWatch Logs, apply a filter for text data types, and create an Amazon S3 bucket location for large data deliveries that do not fit into a LogGroup. The logging configuration also supports simultaneous log delivery into Amazon CloudWatch Logs and Amazon S3.

4. The Streamlit Frontend Application

The final components of the AWS CDK project are defined in the StreamlitStack, where we use the first L3 construct. L3 constructs encapsulate so-called “patterns” and often build on and combine various L1 and L2 constructs to work together to accomplish a specific task or service. Adopting L3 constructs can substantially accelerate prototyping and other initiatives where the emphasis lies on development speed. These constructs are part of the AWS CDK Library. They are easily identifiable by their “_patterns” suffix, typically contain multiple construct options, and can be accessed and imported individually, as seen in the code snippet with aws_ecs_patterns.

The 1st code block in the below snippet uses the ecs_patterns.ApplicationLoadBalancedFargateService construct, which takes in a relatively small number of parameters and applies a relatively large number range of opinionated defaults to create an assembly containing a Virtual Private Network, an Application Load Balancer, and an Amazon ECS cluster, plus all the required “connective tissue” and AWS IAM permissions.

The ApplicationLoadBalancedFargateService construct exposes a set of parameters, including the ApplicationLoadBalancedTaskImageOption: Notice how we pass in the container image, which the construct then uses to create ECS task definitions and launch the container task on AWS Fargate serverless compute resources. The from_asset method is available for the ecs.ContainerImage class, references a path location to a Dockerfile, builds and tags the container image accordingly, creates and pushes it into an AWS ECR repository, and uses the container reference within the Amazon ECS task… This L3 construct abstracts away a lot of work and details.

The TaskImageOptions are also used to define environment variables for the container image. The code snippet shows this by passing in the Agent ID and Agent Alias variables: Both variable values are imported with the Fn.import_value method, as another Stack created and exported the Agent and alias. Container processes such as the Streamlit application can access them to make the Agent invocation API call with the boto3 AWS SDK.

from aws_cdk import (
Stack,
aws_certificatemanager as acm,
aws_cognito as cognito,
aws_efs as efs,
aws_ec2 as ec2,
aws_ecs as ecs,
aws_ecs_patterns as ecs_patterns,
aws_iam as iam,
Duration as Duration,
Fn as Fn,
RemovalPolicy,
CfnOutput,
)
from cdk_nag import (
NagPackSuppression,
NagSuppressions
)
from constructs import Construct
import hashlib

class StreamlitStack(Stack):

def __init__(self, scope: Construct, construct_id: str, dict1, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

# Creating the VPC for the ECS service
vpc = ec2.Vpc(self, "CompanionVPC",
ip_addresses=ec2.IpAddresses.cidr("10.0.0.0/16"),
max_azs=2,
nat_gateway_subnets=None,
subnet_configuration=[ec2.SubnetConfiguration(name="public",subnet_type=ec2.SubnetType.PUBLIC,cidr_mask=24)]
)

# Create a unique string to create unique resource names
hash_base_string = (self.account + self.region)
hash_base_string = hash_base_string.encode("utf8")

# Was the certificate argument been added as part of the cdk deploy? If so then a certification will be created for the load_balanced_service
acm_certificate_arn = self.node.try_get_context('acm_certificate_arn')

### 1. Create the ECS service for the Streamlit application

# Use the ApplicationLoadBalancedFargateService L3 construct to create the application load balanced behind an ALB
load_balanced_service = ecs_patterns.ApplicationLoadBalancedFargateService(self, "CompanionService",
vpc=vpc,
cpu=1024,
memory_limit_mib=4096,
desired_count=1,
public_load_balancer=True,
assign_public_ip=True,
enable_execute_command=True,
certificate=(acm.Certificate.from_certificate_arn(self, "certificate", certificate_arn=acm_certificate_arn) if acm_certificate_arn else None),
redirect_http=(True if acm_certificate_arn else False),
load_balancer_name=("saas-companion-" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]).lower(),
task_image_options=ecs_patterns.ApplicationLoadBalancedTaskImageOptions(
# Builds and imports the container image directly from the local directory (requires Docker to be installed on the local machine)
image=ecs.ContainerImage.from_asset("streamlit"),
environment={
"STREAMLIT_SERVER_RUN_ON_SAVE": "true",
"STREAMLIT_BROWSER_GATHER_USAGE_STATS": "false",
"STREAMLIT_THEME_BASE": "light",
"BEDROCK_AGENT_ID": Fn.import_value("BedrockAgentID"),
"BEDROCK_AGENT_ALIAS": Fn.import_value("BedrockAgentAlias"),
"AWS_REGION": self.region,
"AWS_ACCOUNT_ID": self.account,
}
)
)

# Export the ALB Name
CfnOutput(self, "ALBName",
value=load_balanced_service.load_balancer.load_balancer_name,
export_name="ALBName"
)

# Adding the necessary permissions to the ECS task role to interact with the services

load_balanced_service.task_definition.add_to_task_role_policy(
statement=iam.PolicyStatement(
actions=[
"athena:*"
],
resources=["*"]
)
)

load_balanced_service.task_definition.add_to_task_role_policy(
statement=iam.PolicyStatement(

actions=[
"glue:*"
],
resources=["*"]
)
)

load_balanced_service.task_definition.add_to_task_role_policy(
statement=iam.PolicyStatement(
actions=[
"bedrock:*"
],
resources=["*"]
)
)

load_balanced_service.task_definition.add_to_task_role_policy(
statement=iam.PolicyStatement(
actions=[
"cloudwatch:DescribeAlarmsForMetric",
"cloudwatch:GetMetricData",
"ec2:*",
"elasticfilesystem:*",
"kms:DescribeKey",
"kms:ListAliases"
],
resources=["*"]
)
)

load_balanced_service.task_definition.add_to_task_role_policy(
statement=iam.PolicyStatement(
actions=[
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
resources=["*"]
)
)

# Nag Suppressions in place to accommodate the items flagged. This must be addressed for a workload entering production.
NagSuppressions.add_resource_suppressions(
load_balanced_service.task_definition.task_role,
[NagPackSuppression(id="AwsSolutions-IAM5", reason="Role is controlled to services, and actions where limited service API calls required. Where wildcards are used, these are prefixed with resources partial or complete ARNs.")],
True
)

NagSuppressions.add_resource_suppressions(
load_balanced_service.task_definition.execution_role,
[NagPackSuppression(id="AwsSolutions-IAM5", reason="Role is default via the generating construct.")],
True
)

NagSuppressions.add_resource_suppressions(
load_balanced_service.load_balancer,
[NagPackSuppression(id="AwsSolutions-ELB2", reason="Load Balancer not mission criticial, access logs not needed for POC."),NagPackSuppression(id="AwsSolutions-EC23", reason="Expected public access for this POC. Is reinforced by Cognito.")],
True
)

NagSuppressions.add_resource_suppressions(
load_balanced_service.task_definition,
[NagPackSuppression(id="AwsSolutions-ECS2", reason="Data is non-sensitive. As this is a POC environment variables are OK.")],
True
)

NagSuppressions.add_resource_suppressions(
load_balanced_service.cluster,
[NagPackSuppression(id="AwsSolutions-ECS4", reason="Container insights is not required for POC environment.")],
True
)

### 2. Creating optional resources based on user context variables

# OPTION: Was the "dev" user variable passed in as part of the cdk deploy --context?

# If so then an EFS file system will be created and mounted into the task definition for easy access to the Streamlit application code.
# Creating an EFS file system to help during the development phase. The EFS file system is then mounted into the task definition for easy access to the Streamlit application code.
dev = self.node.try_get_context('dev')
if dev:

efs_file_system = efs.FileSystem(self, "FileSystem",
vpc=vpc,
allow_anonymous_access=True,
encrypted=True,
security_group=security_group,
removal_policy=RemovalPolicy.DESTROY
)

load_balanced_service.task_definition.add_volume(
name="my-efs-volume",
efs_volume_configuration=ecs.EfsVolumeConfiguration(
file_system_id=efs_file_system.file_system_id
)
)

# Mounting the EFS file system root volume into the container
# As the EFS file system is empty the container wont see the app files. Use the cloud9 instace to copy the app files into it for development purposes.
# For production, the EFS file system and the countainer mount wont be necessary as the container image will have the final app files already.
container_definition = load_balanced_service.task_definition.default_container
container_definition.add_mount_points(
ecs.MountPoint(
container_path="/usr/src",
source_volume="my-efs-volume",
read_only=False
)
)

# Creating a cloud9 environment for the development phase
cloud9 = ec2.Instance(self, "Cloud9",
instance_type=ec2.InstanceType.of(
ec2.InstanceClass.BURSTABLE2,
ec2.InstanceSize.MICRO
),
machine_image=ec2.MachineImage.latest_amazon_linux(),
vpc=vpc,
vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
security_group=security_group,
key_name="bedrock",
block_devices=[ec2.BlockDevice(device_name="/dev/xvda", volume=ec2.BlockDeviceVolume.ebs(8, encrypted=True))],
user_data=ec2.UserData.for_linux()
)

# Adding an ingress rule to the security group to allow inbound NFS traffic
security_group = load_balanced_service.service.connections.security_groups[0]
security_group.add_ingress_rule(
ec2.Peer.ipv4(vpc.vpc_cidr_block),
ec2.Port.tcp(2049),
description="Allow inbound NFS traffic"
)
security_group.add_ingress_rule(
ec2.Peer.ipv4("0.0.0.0/0"),
ec2.Port.tcp(2049),
description="Allow inbound NFS traffic from anywhere"
)


# OPTION: Was an email address argument passed in as part of the cdk deploy?
# If so then authentication will be applied to the ALB
email_address = self.node.try_get_context('email_address')

if email_address:
# Are we using a custom domain name?
custom_domain_name = self.node.try_get_context('domain_name')

# Declare variables here for logic that is reused
domain_name = (custom_domain_name if custom_domain_name else load_balanced_service.load_balancer.load_balancer_dns_name)
full_domain_name = ("https://" if acm_certificate_arn else "http://") + domain_name

# Create a Cognito User Pool for user authentication
user_pool = cognito.UserPool(self, "UserPool",
self_sign_up_enabled=False,
user_invitation=cognito.UserInvitationConfig(
email_subject="New Account",
email_body="""Hi there,

You've been granted permission to use the application:
""" + full_domain_name + """

Your username is '<b>{username}</b>' and your temporary password is <b>{####}</b>"""
),
auto_verify=cognito.AutoVerifiedAttrs(email=True),
password_policy=cognito.PasswordPolicy(min_length=8, require_digits=True, require_symbols=True, require_lowercase=True, require_uppercase=True),
advanced_security_mode=cognito.AdvancedSecurityMode.ENFORCED
)

NagSuppressions.add_resource_suppressions(
user_pool,
[NagPackSuppression(id="AwsSolutions-COG2", reason="MFA is not required as a POC.")],
True
)

# Create a Cognito User Pool Domain for the ALB to use for authentication
user_pool_domain = user_pool.add_domain("UserPoolDomain",
cognito_domain=cognito.CognitoDomainOptions(
domain_prefix="<YOUR DOMAIN PREFIX>" + str(hashlib.sha384(hash_base_string).hexdigest())[:15]
)
)

# Create a Cognito User Pool Client for the ALB to use for authentication. This is required for the ALB to use the Cognito User Pool.
user_pool_client = user_pool.add_client("UserPoolClient",
o_auth=cognito.OAuthSettings(
scopes=[
cognito.OAuthScope.OPENID
],
callback_urls=[
full_domain_name + "/oauth2/idpresponse"
],
flows=cognito.OAuthFlows(authorization_code_grant=True)
),
auth_flows=cognito.AuthFlow(user_password=True),
generate_secret=True
)

# Create a Cognito User Pool User based on the provided email address for the ALB to use for authentication. This is required for the ALB to use the Cognito User Pool. This user is the only user that will be able to access the ALB.
cognito.CfnUserPoolUser(self, "UserPoolUser",
desired_delivery_mediums=["EMAIL"],
user_attributes=[cognito.CfnUserPoolUser.AttributeTypeProperty(
name="email",
value=email_address
)],
username="app_user",
user_pool_id=user_pool.user_pool_id
)

# Apply the Cognito User Pool to the ALB. This will cause the ALB to use the Cognito User Pool for authentication. This is required for the ALB to use the Cognito User Pool. This user is the only user that will be able to access the ALB.
load_balanced_service.listener.node.default_child.default_actions = [
{
"order": 1,
"type": "authenticate-cognito",
"authenticateCognitoConfig": {
"userPoolArn": user_pool.user_pool_arn,
"userPoolClientId": user_pool_client.user_pool_client_id,
"userPoolDomain": user_pool_domain.domain_name,
}
},
{
"order": 2,
"type": "forward",
"targetGroupArn": load_balanced_service.target_group.target_group_arn
}
]

load_balanced_service.load_balancer.connections.allow_to_any_ipv4(ec2.Port.tcp(443))

The above code block, however, does not show the contents of the Dockerfile or the Streamlit application files and its model invocation handler: The following snippet illustrates how the Streamlit app performs a direct agent invocation with the boto3 AWS SDK. The entry point is the agent_handler function, which takes as input the “event” parameter, which, in our case, contains a dictionary with the user prompt and then calls the askQuestion(question, endSession) function to obtain the response from the agent. Notice how we iterate over the invoke_agent response body, as it returns a base64 encoded stream.

import boto3
from botocore.exceptions import ClientError
import os
import logging

agentId = os.environ["BEDROCK_AGENT_ID"]
agentAliasIdString = os.environ["BEDROCK_AGENT_ALIAS"]
agentAliasId = agentAliasIdString[-10:]
sessionId = "MYSESSION"

theRegion = os.environ["AWS_REGION"]
region = os.environ["AWS_REGION"]
llm_response = ""

def askQuestion(question, endSession=False):

"""
Sends a prompt for the agent to process and respond to.

:param agent_id: The unique identifier of the agent to use.
:param agent_alias_id: The alias of the agent to use.
:param session_id: The unique identifier of the session. Use the same value across requests to continue the same conversation.
:param prompt: The prompt that you want Claude to complete.
:return: Inference response from the model.
"""

try:
client = boto3.client('bedrock-agent-runtime', region_name=region)
logging.info(f"Invoking agent with question: {question}")
response = client.invoke_agent(
agentId=agentId,
agentAliasId=agentAliasId,
sessionId=sessionId,
inputText=question,
)

completion = ""

for event in response.get("completion"):
chunk = event["chunk"]
completion = completion + chunk["bytes"].decode()

except ClientError as e:
logging.error(f"Couldn't invoke agent. {e}")
raise

print(completion)

return completion

def agent_handler(event, context):

"""
Takes in an event body containing the user prompt (question) and returns the response from the agent.

:param event: A dict that contains the user prompt and session id.
:param context: The context of the prompt.
"""

sessionId = event["sessionId"]
question = event["question"]
endSession = False

print(f"Session: {sessionId} asked question: {question}")

try:
if (event["endSession"] == "true"):
endSession = True
except:
endSession = False

try:
response = askQuestion(question, endSession)
return response

except Exception as e:
return "An error occurred. Please adjust the question and re-submit... :sparkles:"

Sidebar: The agent invocation logic of the Streamlit web application could be simplified by having the app call an AWS API Gateway instead of invoking the Agent directly. In that way, the Steamlit web app and other potential web clients would call a standard RESTful API interface, while we would handle the agent invocation logic as an AWS Lambda function.

The 2nd code block defines optional resources based on user context variables, which are defined as part of the cdk deploy CLI command:

cdk deploy --all --context acm_certificate_arn=$CERT_ARN domain_name=$CUSTOM_DOMAIN_NAME email_address=$EMAIL_ADDRESS dev=true --require-approval never

The user variables can then be accessed with the try_get_context method and validated as conditionals in the Stack. If any context variables are not set, they are read as “None,” and the optional resources are not created. As a practical note, be sure to “cdk bootstrap” the account first, then run the cdk deploy command to initiate the application deployment.

cdk deploy --all deploys the four Stacks we defined in sequence, and the SteamlitStack’s AWS CloudFormation Output will contain the ALB URL on which the Streamlit app can be reached. Now we’re ready to evaluate the prototype stack and iterate.

Conclusions

Amazon Bedrock is a fully managed service that exposes various foundation models (FMs) from different model providers and provides a broad set of capabilities for building agentic generative AI applications. The service exposes a serverless interface without provisioning, scaling, or securing any infrastructure and accelerates application builders that use and customise foundation models with private data sets using AWS tools. Agents for Bedrock are integrated with AWS Lambda, a serverless compute option to run code without provisioning or managing servers that is familiar to many builders on AWS.

Like any other AWS service, Amazon Bedrock is accompanied by a full API specification and is supported by the AWS CLI, AWS SDKs, AWS CloudFormation, and AWS CDK. Developers and builders who want to explore, build and iterate quickly with Amazon Bedrock can create full application stacks by leveraging AWS CDK construct patterns and using frontend libraries such as the Streamlit Python module to create conversational web applications.

AWS application builders can also leverage various supporting services for building out supporting data foundations, which can also be defined with AWS toolkits.

--

--

Dirk Michel

SVP SaaS and Digital Technology | AWS Ambassador. Talks Cloud Engineering, Platform Engineering, Release Engineering, and Reliability Engineering.