Real-Time Communication with API Gateway WebSockets and Cognito

Eric Bach
AMA Technology Blog
6 min readApr 22, 2024

In the world of modern web development, real-time communication has become increasingly essential. WebSockets enable bidirectional, full-duplex communication over a single, long-lived TCP connection, making them ideal for applications requiring real-time data exchange, such as a chat application that can be applied to generative AI use cases.

Amazon API Gateway provides WebSocket APIs, allowing developers to build scalable and real-time communication applications. Paired with Amazon Cognito User Pools, which provides authentication and authorization of users, we have a way to securely build WebSocket APIs.

Photo by Pawel Czerwinski from UnSplash

The limitation with WebSocket APIs built with Amazon API Gateway is that a built-in Cognito Authorizer is not available. This requires creating a custom Lambda authorizer to allow for seamless authentication and authorization of WebSocket connections.

In this post, we’ll take a look at how to create an Amazon API Gateway WebSocket API using CDK and secure it with Amazon Cognito User Pools by using a custom Lambda authorizer. We will use DynamoDB as a NoSQL table to manage active connection IDs. Because the code involved is quite lengthy, I will just highlight the important parts in this post. Feel free to browse the full source code in my repo here.

Fig 1. API Gateway WebSocket with Lambda Cognito Authorizer

WebSocket API

To create an API Gateway WebSocket API several routes will need to be created to handle the WebSocket communication.

We will first create the WebSocket API and a stage for the deployment. The WebSocket API contains the $connect and $disconnect routes to each of our Lambda handlers that will be created next. We also include the Lambda authorizer as part of the $connect route to ensure we validate the Cognito token to authenticate and authorize the user. The authorizer accepts the Cognito token as a query parameter with the idToken key.

const webSocketApi = new WebSocketApi(this, 'WebsocketApi', {
apiName: `${props.appName}-websocket-api-${props.envName}`,
connectRouteOptions: {
authorizer: new WebSocketLambdaAuthorizer('Authorizer', authWebsocket, {
identitySource: ['route.request.querystring.idToken'],
}),
integration: new WebSocketLambdaIntegration(
'ConnectHandlerIntegration',
connectWebsocket
),
},
disconnectRouteOptions: {
integration: new WebSocketLambdaIntegration(
'DisconnectHandlerIntegration',
disconnectWebsocket
),
},
routeSelectionExpression: '$request.body.action',
});

const apiStage = new WebSocketStage(this, 'WebsocketStage', {
webSocketApi,
stageName: 'prod',
autoDeploy: true,
});

$connect

The $connect route is executed when a WebSocket connection is created. During this $connect route the authentication and authorization of the Cognito Authorization Token is also performed using a Lambda function.

The Lambda handler for the $connect route simply saves the WebSocket connection ID in the DynamoDB table so that we can manage the active connections.

import json
import os
import boto3
from aws_lambda_powertools import Logger

TABLE_NAME = os.environ["TABLE_NAME"]
ddb = boto3.resource("dynamodb")
table = ddb.Table(TABLE_NAME)
logger = Logger()

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
connection_id = event["requestContext"]["connectionId"]
table.put_item(Item={"connectionId": connection_id})
return {}

The Cognito Authorizer validates the incoming authorization token with the Cognito User Pool. The token is passed as a query parameter, idToken, that is decoded to verify the claims (expiry, client ID) are valid. If there are additional authorization scopes they can also be verified at this time. The appropriate IAM policy is then returned to either allow or deny access to establish connection to the WebSocket API.

import os
import json
import time
import urllib.request
from jose import jwk, jwt
from jose.utils import base64url_decode
from aws_lambda_powertools import Logger

# Some code removed for brevity

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
token = event["queryStringParameters"]["idToken"]

try:
claims = validate_token(token)
print("Token is valid. Payload:", claims)
return get_allow_policy(event["methodArn"], claims)
except:
print("Token is not valid")

return get_deny_policy()

def get_allow_policy(methodArn, claims):
return {
"principalId": claims['sub'],
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Allow",
"Resource": methodArn
}
]
}
}

def get_deny_policy():
return {
"principalId": "*",
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "*",
"Effect": "Deny",
"Resource": "*"
}
]
}
}

def validate_token(token):
# Code removed for brevity

claims = jwt.get_unverified_claims(token)

if time.time() > claims['exp']:
raise('Token is expired')
if claims['aud'] != APP_CLIENT_ID:
raise('Token was not issued for this audience')

return claims

$disconnect

The $disconnect route is executed after the connection is closed. This is useful in cleaning up the connection IDs in the DynamoDB table and the handler code is very similar to the $connect route.

import json
import os
import boto3
from aws_lambda_powertools import Logger

TABLE_NAME = os.environ["TABLE_NAME"]
ddb = boto3.resource("dynamodb")
table = ddb.Table(TABLE_NAME)
logger = Logger()

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
connection_id = event["requestContext"]["connectionId"]
table.delete_item(Key={"connectionId": connection_id})
return {}

SendMessage

Finally, the SendMessage route is used to send messages back to connected clients. The Lambda function makes a POST request containing the message to each connection ID using the post_to_connection() method from the API Gateway Management API. As a result, any connected clients will receive the message from the WebSocket API.

import os
import boto3
import json
from aws_lambda_powertools import Logger

TABLE_NAME = os.environ["TABLE_NAME"]
ddb_client = boto3.client("dynamodb")
ddb = boto3.resource("dynamodb")
table = ddb.Table(TABLE_NAME)
logger = Logger()

@logger.inject_lambda_context(log_event=True)
def handler(event, context):
event_body = json.loads(event["body"])

paginator = ddb_client.get_paginator("scan")
connectionIds = []

api_gateway_management_api = boto3.client(
"apigatewaymanagementapi",
endpoint_url=
"https://" + event["requestContext"]["domainName"] + "/" +
event["requestContext"]["stage"]
)

# Extend connections
for page in paginator.paginate(TableName=TABLE_NAME):
connectionIds.extend(page["Items"])

for connectionId in connectionIds:
try:
api_gateway_management_api.post_to_connection(
ConnectionId=connectionId["connectionId"]["S"],
Data=json.dumps({"message": event_body["message"]})
)
except Exception as e:
logger.error(f"Error sending message to connectionId
{connectionId}: {e}")

return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "*",
},
"body": json.dumps({"message": event_body["message"]}),
}

We will also need to add the route to the CDK stack containing the WebSocket API as well as the necessary permissions to allow the SendMessage Lambda function to POST to the WebSocket API. This is using the action execute-api:ManageConnections .

webSocketApi.addRoute('SendMessage', {
integration: new WebSocketLambdaIntegration('SendMessageIntegration', sendMessage),
});

sendMessage.addToRolePolicy(
new PolicyStatement({
actions: ['execute-api:ManageConnections'],
resources: [
this.formatArn({
service: 'execute-api',
resourceName: `${apiStage.stageName}/POST/*`,
resource: webSocketApi.apiId,
}),
],
})
);

Frontend

On the React frontend, we create a WebSocket object that manages the connection events for us. We get the Cognito Authorization token from Amplify session and include this in the query parameter when initializing the WebSocket.

The submitMessage() function makes a request to the SendMessage route of the WebSocket API to post the message. This invokes the SendMessage Lambda handler we created previously that publishes the message to all connected clients.

Upon receipt of this message our client executes the onmessage event handler that updates the React state with the message it received from the WebSocket API.

const [client, setClient] = useState<WebSocket>();

const initializeClient = async () => {
const idToken = (await Auth.currentSession()).getIdToken().getJwtToken();
const client = new WebSocket(
`${import.meta.env.VITE_API_WEBSOCKET_ENDPOINT}?idToken=${idToken}`);

client.onopen = (e) => {
console.log('WebSocket connection established.');
};

client.onerror = (e: any) => {
console.error(e);

setTimeout(async () => {
await initializeClient();
});
};

client.onclose = () => {
if (!closed) {
setTimeout(async () => {
await initializeClient();
});
} else {
console.log('WebSocket connection closed.');
}
};

client.onmessage = async (message: any) => {
const event = JSON.parse(message.data);

setMessage('');
setConversation((conversation) => [...(conversation || []), event.message]);
setLoadingMessage(false);
};

setClient(client);
};

// Some code omitted for brevity

const submitMessage = async (event: any) => {
setLoadingMessage(true);

if (event.key !== 'Enter') {
return;
}

client?.send(
JSON.stringify({
action: 'SendMessage',
message,
token: (await Auth.currentSession()).getIdToken().getJwtToken()
})
);
};

Testing It Out

We can start our React frontend, sign up and login a new user, and send our first message to the chat. The backend currently just echos the same message back to the user but this behaviour can be changed for any real-world application. Any connected client will also receive this message through the WebSocket API!

Conclusion

In this blog post, we explored how to build a WebSocket API using Amazon API Gateway and secure the WebSocket connections using a custom Lambda authorizer for Cognito authentication. By leveraging API Gateway WebSockets and Cognito’s user management capabilities, developers can implement robust authentication and authorization mechanisms for real-time communication applications. This setup ensures that only authenticated users can access the WebSocket API, providing a secure and seamless user experience.

Eric Bach is a Senior Software Developer @ Alberta Motor Association. He is passionate about building highly scalable and performant cloud-native solutions. In his spare time, Eric enjoys writing about serverless technologies and contributes to several technical blog publications.

--

--

Eric Bach
AMA Technology Blog

Senior Software Developer @ amaabca | AWS Certified x 2 | Domain Driven Design | Event Driven Architecture | CQRS | Microservices