How to use AWS: SNS and SQS

AlexV
Analytics Vidhya
Published in
7 min readJul 3, 2020

--

Ello ello ello my fellow engineers!

Recently I had to set up communication between two services we built, which was an interesting task to do. There are many message service options, but we ended up using AWS: SNS and SQS.

To begin, I’ll give you a quick intro into the background of why you use a messaging service. Then I’ll show you an example in Python about how to set up a simple SNS Topic, publish a message, set up an SQS Queue, subscribe to the SNS Topic and read a message consumed by the SQS Queue…so not much…

Have no fear! It sounds complicated but using Boto3 and Moto makes it super easy.

If you are more of an auditory learner then you can follow along with this YouTube video: https://youtu.be/E2-nbtdKZ-4

Background

Firstly, let’s look at why you would use a messaging service:

Microservices using a messaging service to communicate

When you have a microservice infrastructure some of those services may need to communicate with each other. You might have some data that is produced in one service and needs further processing in another; but you need to maintain tracking of the data and make sure it stays safe until it has been processed.

This is where a messaging service comes to help.

Communication

Using a messaging service provides a consistent structure for communication between your services. This structure can be adopted across your infrastructure for easy, manageable messaging.

Queueing

For some services you may produce several messages that need to be consumed by another service. In this instance you might want to set up a queue.

Parallel Processing

For large data streams, messaging services can be scaled. As long as your data is not coupled, it can be processed in parallel.

Redundancy

Like with all good systems, we might want some redundancy. You can set your messaging service up with retries, dead letter queues or commit offsets (if you are using Kafka). All the while keeping the message safe from being lost until it has been consumed…you wouldn’t want a new user signing up to your website to fail because a message was not processed.

AWS: SNS and SQS

So what is the difference?

SNS

SNS structure

SNS stands for Simple Notification Service. It is your typical Publisher/Subscriber one-to-many service. Messages are published to a Topic and then consumers subscribed to those Topics receive the same message. Think of it like a newspaper being delivered to several different households.

SQS

SQS structure

SQS stands for Simple Queue Service. You set up a queue and add messages to it. Then one of your services reads from the queue and processes the next message. Messages are only read and processed by one source. Think of it like receiving a personal letter in the post.

Setup SQS queue to subscribe to SNS Topic

SQS queue subscription to SNS Topic

You can set up queues to subscribe to SNS Topics too and in the example below we’ll go through how to do that.

Time for the fun stuff!

Dependencies:

  • Boto3
  • Moto
  • Pytest

Create a function that receives a message and publishes it to SNS:

"""aws_service_handler.py"""import os

import boto3


def publish_message_to_sns(message: str):
topic_arn = os.environ["sns_topic_arn"]

sns_client = boto3.client(
"sns",
region_name="eu-west-1",
)

message_id = sns_client.publish(
TopicArn=topic_arn,
Message=message,
)

return message_id

In order to publish to a SNS Topic you need the Amazon Resource Name (ARN) of the Topic. Above, we get that from an environment variable called sns_topic_arn then set up the sns_client using Boto3 and publish the message from that client. AWS will return a MessageId that you can return from the function.

TEST TEST TEST

Now let’s write our test! I’m a strong believer in blogs not just showing you “Show Code” as Software Engineers we need to get into the mentality that we will sometimes be spending more time on testing than we will on writing the implementation.

So let’s test the code above:

"""test_aws_service_handler.py"""import os

import boto3
from moto import mock_sns
from aws_service_handler import publish_message_to_sns@mock_sns
def test_publish_message_to_sns():
sns_resource = boto3.resource(
"sns",
region_name="eu-west-1"
)
topic = sns_resource.create_topic(
Name="test-topic"
)

os.environ["sns_topic_arn"] = topic.arn
test_message = "test_message"
message_id = publish_message_to_sns(test_message)

assert message_id["ResponseMetadata"]["HTTPStatusCode"] == 200

The way to implement Moto is to use decorators that will wrap your test and build AWS infrastructure in memory, which Boto3 will use. In the above test we use mock_sns to make a SNS resource and create a Topic to publish our message, using our function built earlier.

The MessageId response will look something like:

{
"MessageId": "7c446cbb-fb6a-4c03-bc0b-ded3641d5579",
"ResponseMetadata": {
"RequestId": "f187a3c1-376f-11df-8963-01868b7c937a",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"server": "amazon.com"
},
"RetryAttempts": 0
}
}

Therefore we can assert that we get a 200 status code (Happy Path :) ) response.

More fun stuff!

Setup a function that will read a message from an SQS queue:

"""aws_service_handler.py"""import os

import boto3
def read_from_sqs_queue():
queue_url = os.environ["sqs_queue_url"]
sqs_client = boto3.client(
"sqs",
region_name="eu-west-1",
)

messages = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
)

return messages

Similar to SNS, in this instance we will need a QueueUrl that points to the queue our sqs_client will receive messages from. We only want to read one message at a time so we set MaxNumberOfMessages to 1, and return our messages.

TEST TEST TEST…again!

This test is going to be slightly more complex.

Like the last test we are going to create a SNS Topic but we are also going to create a SQS Queue that subscribes to that Topic. So we need both mock_sns and mock_sqs decorators from Moto.

"""test_aws_service_handler.py"""import json
import os

import boto3
from moto import mock_sns, mock_sqs
from aws_service_handler import read_from_sqs_queue
@mock_sns
@mock_sqs
def test_read_from_sqs_queue():
sns_resource = boto3.resource(
"sns",
region_name="eu-west-1"
)
topic = sns_resource.create_topic(
Name="test-topic"
)

sqs_resource = boto3.resource(
"sqs",
region_name="eu-west-1",
)

queue = sqs_resource.create_queue(
QueueName="test-queue",
)
os.environ["sqs_queue_url"] = queue.url
os.environ["sns_topic_arn"] = topic.arn

topic.subscribe(
Protocol="sqs",
Endpoint=queue.attributes["QueueArn"],
)

test_message = "test_message"
message_id = publish_message_to_sns(test_message)

messages = read_from_sqs_queue()
message_body = json.loads(messages["Messages"][0]["Body"])

assert message_body["MessageId"] == message_id["MessageId"]
assert message_body["Message"] == test_message

After creating a SNS Resource and SNS Topic, we make a SQS resource and SQS Queue. We then set the environment variables that will be needed in the functions and have SQS subscribe to the SNS Topic, using “sqs” Protocol and setting the Endpoint to the QueueArn. Then like the first test, we create a test_message and publish the message to SNS.

Then we read our messages from the queue using the function we created.

The Messages object returned from AWS will look (really messy!) similar to:

{
'Messages': [
{
'MessageId': 'b5f1b55e-00b1-3430-fbd8-1c5873369f9d',
'ReceiptHandle': 'wlmpcuuuusmottnwqvbanonfztvgntpsulcrjmyyjxeinlmrkrsaxxnvdhenqfhcbmymmuiepcexwiwfgwbthqhgrkktankjacedfydhkhtekyqmgfclbvecdcuqazvclefxynbnbaockuoksbdmfugqnpwwzjwqqzvofjkavpjtdtydbvlujgkmz',
'MD5OfBody': '4bf2e25ee48ea07fe629c59c056501eb',
'Body': '{\n "Message": "test_message",\n "MessageId": "2ec54c59-e7ed-450b-9de8-1b94980e9d71",\n "Signature": "EXAMPLElDMXvB8r9R83tGoNn0ecwd5UjllzsvSvbItzfaMpN2nk5HVSw7XnOn/49IkxDKz8YrlH2qJXj2iZB0Zo2O71c4qQk1fMUDi3LGpij7RCW7AW9vYYsSqIKRnFS94ilu7NFhUzLiieYr4BKHpdTmdD6c0esKEYBpabxDSc=",\n "SignatureVersion": "1",\n "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",\n "Subject": "my subject",\n "Timestamp": "2020-06-28T01:07:17.218Z",\n "TopicArn": "arn:aws:sns:eu-west-1:123456789012:test-topic",\n "Type": "Notification",\n "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:some-topic:2bcfbf39-05c3-41de-beaa-fcfcc21c8f55"\n}',
'Attributes': {
'SenderId': 'AIDAIT2UOQQY3AUEKVGXU',
'SentTimestamp': '1593306437219',
'ApproximateReceiveCount': '1',
'ApproximateFirstReceiveTimestamp': '1593306437233'
}
}
],
'ResponseMetadata': {
'RequestId': 'CXIXFV0VWIW73I4K5FDTO64LOOWDQNHSI5BNHZQSLWKT8IG2J4YK',
'HTTPStatusCode': 200,
'HTTPHeaders': {
'server': 'amazon.com',
'x-amzn-requestid': 'CXIXFV0VWIW73I4K5FDTO64LOOWDQNHSI5BNHZQSLWKT8IG2J4YK',
'x-amz-crc32': '3042017029'
},
'RetryAttempts': 0
}
}

There are two things we really want to test here:

  1. The MessageId is the same ID that we received from SNS for publishing the message.
  2. The Message in the Body itself is the same message we published to SNS.

In the above Messages object you can see in the Body that our test_message has been read! So the only thing left to do is load the JSON string and assert against the two criteria!

Simple as that!

We have successfully created an SNS Topic, created an SQS Queue, subscribed to the Topic, published a message to the Topic and read the message from our SQS Queue.

Lovely Jubbly!

I hope this has been of some use. If you have any questions please let me know. See you on the next one!

Documentation:

--

--

AlexV
Analytics Vidhya

while(1): pour coffee | Knowledge (Sharing) is Power