Integrating Lambda to send and receive messages from SQS and SNS: Part 2

Aalok Trivedi
7 min readFeb 27, 2023

Intro

In Part 1, we built a process that sends an API request to get the current datetime or random UUID and sends that message to an SQS queue for further processing. Now it’s time to process the messages from the queue, send an SNS notification, and store the data to a DynamoDB table.

If you need a refresher, here is the link to Part 1.

What we’ll build

  1. An SNS topic with an email subscription.
  2. A DynamboDB table.
  3. A Lambda function that will retrieve messages from the SQS queue, send them to the SNS topic, and write the data to the DynamoDB table.

Let's get started!

Groundwork

Similar to Part 1, we have to start with a little groundwork and make sure everything is in place before we start connecting resources together.

Create the SNS topic

Let’s create the SNS topic that will receive our message from the Lambda and send an email. In a new python file, named createSNS.py, import Boto3, create an SNS resource, and define a new function.

import boto3

sns = boto3.resource('sns', region_name='us-east-1')

def createSNS(topicName):
try:
topic = sns.create_topic(Name=topicName)
print(topic.arn)

except Exception as e:
print(e)

#call the function
createSNS('datetime-uuid-topic')

We can create a new topic with the .create_topic() method and pass in a Name. We will need the topic’s ARN, to send the message in the Lambda function later on, so let’s print that out and keep it handy.

Next, we’ll need to create a subscription — a destination for our topic to send the data. In this case, we’re going to use an email protocol. In the SNS Console, navigate to the topic and add a subscription.

Add the email address we want the topic to send the message. We’ll need to confirm the subscription from the email address (Check the spam folder).

Create the Lambda

Like last time, create a new Lambda and name it retrieveDatetime_UUID and let it create a new default execution role. We should also increase the function’s timeout to 30 seconds.

Create the DynamoDB table

Next, we’ll create the table where we’ll store our message data. In the Console, create a new table, named ‘messages.’ We’ll give it a partition key of message-id, and a sort key of timestamp. We’ll keep the defaults for the rest of the settings.

Add permissions

Finally, the Lambda will need permissions to interact with the SQS queue, SNS topic, and DynamoDB table. Just as in Part 1, let's navigate to the Lambda’s execution role and edit the policy to include the following actions:

  • SQS: “sqs:DeleteMessage”, “sqs:Get*”, “sqs:ReceiveMessage”
  • SNS: “sns:Publish”
  • DynamoDB: “dynamodb:BatchWriteItem”, “dynamodb:PutItem”

Make sure you include the ARN of each resource.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:Get*",
"sqs:ReceiveMessage",
"logs:CreateLogStream",
"logs:PutLogEvents",
"sns:Publish",
"dynamodb:BatchWriteItem",
"dynamodb:PutItem",

],
"Resource": [
"arn:aws:sns:us-east-1:054301730155:dateTime-uuid-topic",
"arn:aws:sqs:us-east-1:054301730155:DateTime_UUID_sqs",
"arn:aws:logs:us-east-1:054301730155:log-group:/aws/lambda/retrieveDatetime_UUID_message:*",
"arn:aws:dynamodb:us-east-1:054301730155:table/messages"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:054301730155:*"
}
]
}

Write the Lambda function

Alright, back to the fun part!

Add an SQS trigger

Let’s start by adding our SQS trigger through the visualizer and choose our SQS queue from Part 1.

Environment variables

Now let’s add a few environment variables.

  • QUEUE_NAME: the name of our SQS queue.
  • TOPIC_ARN: the ARN of our SNS topic.
  • TABLE_NAME: the name of our DynamoDB table.

The code

As always, let’s import our libraries and establish our environment variables, SNS client, and DynamoDB resource.

import boto3
import os
import json
from datetime import datetime

#environment variables
QUEUE_NAME = os.environ["QUEUE_NAME"]
TOPIC_ARN = os.environ["TOPIC_ARN"]
TABLE_NAME = os.environ["TABLE_NAME"]

sns = boto3.client('sns')
dynamodb = boto3.resource('dynamodb')

Inside the lambda_handler, let’s capture the eventual SQS event.

def lambda_handler(event, context):

messages = event['Records']

This is the structure of an SQS event response, so we’ll need to iterate over the ‘Records’ to extract the data. The two main keys we’ll need are “messageId,” and “body.” The “body” is where we’ll see the message string we sent from the sendDatetime_UUID Lambda.

{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}

Create a new Test event with the above JSON, so we can test if the messages are coming through.

Let’s iterate over the Records to capture the data we need in a dictionary along with a timestamp.

Remember, the partition and sort keys we created for our table aremessage-id and timestamp.

for message in messages
messageData = {
'message-id': message['messageId'],
'body': message['body'],
'timestamp': datetime.now().isoformat()
}

Publish to SNS topic

We have our message data stored, so let’s publish it to our topic. With the .publish() method, we need to pass in a TopicArn, Message, and an optional Subject (The first two are required). pass in the ARN we set as an environment variable and a JSON of our message data. We’ll also print out the result to the console.

try:
# publish message to sns
snsResponse = sns.publish(
TopicArn=TOPIC_ARN,
Message=json.dumps(messageData, indent=4),
Subject='DATETIIME_UUID_MESSAGE'
)
print(f"Message sent to SNS: {json.dumps(messageData, indent=4)}")

except Exception as e:
print(e)

Let’s test it and check our email. We can either use the Test event or trigger it from our API from Part 1.

SNS sent from Test
SNS sent from API/first Lambda

Success! The SNS publisher is working!

Write to DynamoDB table

Great! The Lambda is successfully receiving messages from SQS and sending the data to an email via SNS. Now, it's time to store the data in a table.

Below the SNS publish code, create a Table sub-resource with our TABLE_NAME environment variable.

#write to dynamodb
table = dynamodb.Table(TABLE_NAME)

With the .put_item() method, we can pass in our attributes in the Item argument. The Item expects a dictionary of key-values, so we can just pass in the messageData. We’ll also print out a confirmation to the console and return the response.

dynamodbResponse = table.put_item(Item = messageData)
print(f"Message sent to DynamoDB: {json.dumps(dynamodbResponse, indent=4)}")

return json.dumps(dynamodbResponse, indent=4)

Let’s test it with our Test event and API.

Test execution result
Log results
DynamoDB table

Here is the full Lambda code:

import boto3
import os
import json
from datetime import datetime

#environment variables
QUEUE_NAME = os.environ["QUEUE_NAME"]
TOPIC_ARN = os.environ["TOPIC_ARN"]
TABLE_NAME = os.environ["TABLE_NAME"]

sns = boto3.client('sns')
dynamodb = boto3.resource('dynamodb')

def lambda_handler(event, context):

messages = event['Records']

for message in messages:
messageData = {
'message-id': message['messageId'],
'body': message['body'],
'timestamp': datetime.now().isoformat()
}

try:
# publish message to sns
snsResponse = sns.publish(
TopicArn=TOPIC_ARN,
Message=json.dumps(messageData, indent=4),
Subject='DATETIIME_UUID_MESSAGE'
)
print(f"Message sent to SNS: {json.dumps(messageData, indent=4)}")

#write to dynamodb
table = dynamodb.Table(TABLE_NAME)

dynamodbResponse = table.put_item(Item = messageData)
print(f"Message sent to DynamoDB: {json.dumps(dynamodbResponse, indent=4)}")

return dynamodbResponse

except Exception as e:
print(e)

Success!

Congrats! We just created a data processing solution going from an API >> Lambda 1 >> SQS >> Lambda 2 >> SNS/DynamoDB!

Again, no, this isn’t the most efficient way to handle data processing, BUT it’s a great representation of how data can move and interact from one service to another. And we did it without the need of managing any servers!

This is just the tip of the iceberg Lambda’s capabilities.

Thank you

Thank you for following me on my cloud computing journey. I hope this article was helpful and informative. Please give me a like & follow as I continue my journey, and I will share more articles like this!

--

--