Integrating Lambda to send and receive messages from SQS and SNS: Part 1

Aalok Trivedi
9 min readFeb 27, 2023

Intro

A major part of AWS and cloud engineering is being able to get different services and resources to interact with each other to process data. Lambda is a serverless, event-driven service that runs code without the need to provision or manage servers. You only pay for the time it takes to run your function.

To demonstrate the power of Lambda functions, we’re going to connect several services together to send and receive data.

I’ve broken up this article into two parts. In this article, we’ll focus on the mechanism that sends messages to an SQS queue. In part two, we’ll focus on receiving and storing the messages.

What we’ll build

Part 1:

  1. An HTTP API to send a request to get the current datetime or a randomized UUID.
  2. A Lambda function that will process the API request and return either the current datetime or random UUID.
  3. Send the datetime or UUID data message to an SQS Queue.

Part 2:

  1. A Lambda function to receive messages from the SQS queue
  2. An SNS topic that will receive messages from Lambda and send an email notification.
  3. A DynamoDB table that our Lambda function will write the message data to.

Note: Now, in reality, this isn't really an efficient way of processing data; however, it gives us a good demonstration of how these services can integrate with each other.

Prerequisites

  1. An AWS account with IAM user access.
  2. Familiarity with Python basics and Boto3.
  3. An IDE, such as VS Code or Cloud9.

Groundwork

Before we start, let’s lay the groundwork and create the Lambda function and SQS queue.

Create the Lambda function

In the AWS Management Console, create a new Lambda function and name it sendDatetime_UUID. Choose a runtime of Python 3.9 and let it create a default execution role for us (We’ll edit this later).

By default, the Lambda’s timeout is set to 3 seconds, so let’s increase that to 30 seconds, to be on the safe side. These settings could be found in Configuration > General configuration.

Create the SQS queue

Now let’s create the SQS queue with a python script. We’ll make a new python file called createSQS.py.

import boto3

sqs = boto3.resource('sqs', region_name='us-east-1')

def createSQS(queueName):

queue = sqs.create_queue(QueueName=queueName)
queueURL = queue.url

print(queueURL)

#call the function
createSQS('Datetime_UUID_sqs')

With the SQS resource, we’ll create a new standard queue with the create_queue() method with a required QueueName argument.

A queue URL will be created, which we’ll need to access the queue from our Lambda, so let’s store that URL and print it out.

Run the code to create the queue and copy the URL that is printed in the console. Save it somewhere handy since we’ll need it later.

We’ll also need the queue’s ARN, so in the Console, navigate to the queue and store it somewhere handy as well.

Add permissions

As always, we need to give our Lambda function the proper permissions. It already has the necessary CloudWatch permissions from the default execution role, but we need to add access to SQS.

Remember, we should follow the ‘principle of least privilege’ and only give our Lambda the necessary permissions and access to resources to perform its task.

We need the IAM policy to Allow an Action of SendMessage on the SQS queue (ARN) Resource we created.

In our sendDatetime_UUID Lambda, navigate to Configuration > Permissions and click on the Role name. It will take us to the IAM policy page. Click on the policy link and edit the policy.

In the JSON, add an Action of “sqs:SendMessage” and paste the ARN from the Datetime_UUID_sqs queue under Resource. Review and save the policy.

We now have permissions to send messages to our queue! Our groundwork is done, so let’s continue with creating our message service.

Create the HTTP API

The API we’re creating is going to act as the main trigger for our little micro-process. We’ll create two(2) routes our API will use as endpoints to send a request (one for /datetime, and one for /random-uuid). Depending on the client's endpoint, the Lambda will send either the current datetime or a random UUID.

In the Management Console, navigate to API Gateway and create a new HTTP API. Add a Lambda Integration, select our lambda function from the dropdown menu, and give the API a name.

We’re going to create two routes with an HTTP method of ‘GET.’ One for datetime, with a resource path of ‘/datetime;’ and one for uuid, with a resource path of ‘/random-uuid.’ Select our Lambda function as the integration target.

The API will produce a URL that our code will interface with. If the user routes to /datetime (https://api-url.com/datetime), our Lambda knows to process and return the current datetime. If the user routes to /random-uuid (https://api-url.com/random-uuid), our Lambda knows to process and return a random UUID.

You can find the invoke URL on the API’s main page.

If we go back to our Lambda function and navigate to Configuration > Triggers, we should see both API routes configured.

Write the Lambda function

It’s time for the good stuff! We’ve created our API and SQS queue. Now it’s time to write the logic that will connect the two.

Let’s start by creating some environment variables for easy access to some key information. Navigate to Configuration > Environment variables and create the following:

  • GET_DATETIME: /datetime
  • GET_UUID: /random-uuid
  • QUEUE_URL: the SQS queue URL

Let’s start with importing all the necessary Python libraries and environment variables in our code.

import json
from datetime import datetime #access to various date and time methoods
import uuid #generates uuids
import os #access our environment variables
import boto3 #access AWS resources and methods

#set env variables
GET_DATETIME = os.environ['GET_DATETIME']
GET_UUID = os.environ['GET_UUID']
QUEUE_URL = os.environ['QUEUE_URL']

When our Lambda is triggered (by the API), it creates an event. The event data is where we’ll extract our API request and determine if we need to process a datetime or UUID.

Inside our lambda_handler function, let’s print out the event in JSON format to see the structure.

def lambda_handler(event, context):
print(json.dumps(event))

(Make sure you hit the ‘Deploy’ button every time to save your code).

To trigger the Lambda, we can paste the API URL with the endpoint into a web browser to send a request.

(https://your-api-url.com/random-uuid)
(https://your-api-url.com/datetime)

Note: I ended up making a super simple website with buttons that link to both API URls to make it easier to trigger the API, but it’s by no means necessary.

We get a response of ‘null,’ but it should have triggered our Lambda. If we go to the CloudWatch logs, we’ll see the event.

Test Events

Got it! We can see, that the “rawPath" key gives us the proper request. We can use this JSON to set up easy testing, so we’re not invoking our function over and over again, just to test something. Let’s copy that entire JSON and create a test!

Configure a new test event from the Test button and name the test “http-api-request.” Paste in the JSON and save it. Now if we ‘Test’ it again, we’ll be shown the execution results and the event data in the Function logs.

Get API request

Alright, now can really get moving! Remember, we need to create logic for the Lambda to either get the current time OR create a random UUID based on the API request, so let’s store the rawPath from the event in a variable

def lambda_handler(event, context):
apiEvent = event['rawPath']

Now we can build a conditional to “do stuff” based on that rawpath.

def lambda_handler(event, context):
apiEvent = event['rawPath']

try:
if apiEvent == GET_DATETIME:
#do datetime stuff

elif apiEvent == GET_UUID:
#do uuid stuff

else:
print(f"Error: '{apiEvent}' does not exist.")

except Exception as e:
print(e)

Our basic logic architecture is set up, so let’s start with the datetime. We’ll use the datetime library to get the current datetime (UTC) and the .strftime() method to convert the time into a more readable string format. Store it in a message variable.

try:
if apiEvent == DATETIME:
message= datetime.strftime(datetime.now(), "%m-%d-%Y, %I:%M:%S %p")

elif apiEvent == UUID:
#do uuid stuff

else:
print(f"Error: '{apiEvent}' does not exist.")

except Exception as e:
print(e)

For more on the datetime library, visit the documentation.

Next, if the request is /random-uuid, we’ll create… well, a random UUID. We can easily do this with the built-in Python uuid library.

We’ll also print the message to the logs and return a JSON version of the message.

try:
if apiEvent == DATETIME:
message= datetime.strftime(datetime.now(), "%m-%d-%Y, %I:%M:%S %p")

elif apiEvent == UUID:
message = str(uuid.uuid4())

else:
print(f"Error: '{apiEvent}' does not exist.")

print(message) #prints to logs
return json.dumps(message)

except Exception as e:
print(e)

For more on the UUID library, visit the documentation.

Use the custom test event we created and change the rawPath to /datetime or /random-uuid to make sure both requests work.

And if you trigger the API through the web browser, we should now be able to see the returned message!

Send the message to SQS

We have our datetime and UUID working, so now it’s time to take the output (the message) and send it to our SQS queue to be processed by another worker.

First, we’ll need to create an SQS resource from Boto3 and get our queue (Datetime_UUID_sqs) via the ‘queue URL’ (the one we stored as an environment variable).

Note: We can also use the SQS client as well.

def lambda_handler(event, context):
apiEvent = event['rawPath']

sqs = boto3.resource('sqs')
queue = sqs.Queue(QUEUE_URL)#get the sqs queue

We can send our message to the queue with the .send_message() method and pass it in as the value for the MessageBody argument. We’ll also print a confirmation message to the console that includes the MessageID, which is provided in the response.

try:
if apiEvent == DATETIME:
message = datetime.strftime(datetime.now(), "%m-%d-%Y, %I:%M:%S %p")

elif apiEvent == UUID:
message = str(uuid.uuid4())

else:
print(f"Error: '{apiEvent}' does not exist.")

print(message) #prints to logs
response = queue.send_message(MessageBody=message) #send message to queue
print(f"MessageID: '{response['MessageId']}' sent to queue.") #prints to logs

return json.dumps(message)

except Exception as e:
print(e)

Deploy the code and let’s see if it worked! We can either use the test event or trigger it via the API URL. Let’s trigger the event three times, sending three messages to the queue.

To confirm, let’s go to our SQS console and see if there are any messages available.

Let’s also check our logs.

Success!

Fantastic! We’ve successfully created a Lambda function that is triggered by an API request and sends the request message to an SQS queue!

In Part 2, we’ll create another Lambda function to read the messages from the queue, send an SNS email notification, and store the messages in a DynamoDB table. Stay tuned!

Here is the full code:

import json
from datetime import datetime
import uuid
import os
import boto3

#set env variables
DATETIME = os.environ['GET_DATETIME']
UUID = os.environ['GET_UUID']
QUEUE_URL = os.environ['QUEUE_URL']

def lambda_handler(event, context):
apiEvent = event['rawPath']
sqs = boto3.resource('sqs')
queue = sqs.Queue(QUEUE_URL)

try:
if apiEvent == DATETIME:
message = datetime.strftime(datetime.now(), "%m-%d-%Y, %I:%M:%S %p (UTC)")

elif apiEvent == UUID:
message = str(uuid.uuid4())

else:
print(f"Error: '{apiEvent}' does not exist.")

print(message)

response = queue.send_message(MessageBody=message)

print(f"MessageID: '{response['MessageId']}' sent to queue.")

return json.dumps(message)

except Exception as e:
print(e)

Thank you

Thank you for following me on my cloud computing journey. I hope this article was helpful and informative. Please give me a like & follow as I continue my journey, and I will share more articles like this!

--

--