How to use AWS: SNS and SQS
Ello ello ello my fellow engineers!
Recently I had to set up communication between two services we built, which was an interesting task to do. There are many message service options, but we ended up using AWS: SNS and SQS.
To begin, I’ll give you a quick intro into the background of why you use a messaging service. Then I’ll show you an example in Python about how to set up a simple SNS Topic, publish a message, set up an SQS Queue, subscribe to the SNS Topic and read a message consumed by the SQS Queue…so not much…
Have no fear! It sounds complicated but using Boto3 and Moto makes it super easy.
If you are more of an auditory learner then you can follow along with this YouTube video: https://youtu.be/E2-nbtdKZ-4
Background
Firstly, let’s look at why you would use a messaging service:
When you have a microservice infrastructure some of those services may need to communicate with each other. You might have some data that is produced in one service and needs further processing in another; but you need to maintain tracking of the data and make sure it stays safe until it has been processed.
This is where a messaging service comes to help.
Communication
Using a messaging service provides a consistent structure for communication between your services. This structure can be adopted across your infrastructure for easy, manageable messaging.
Queueing
For some services you may produce several messages that need to be consumed by another service. In this instance you might want to set up a queue.
Parallel Processing
For large data streams, messaging services can be scaled. As long as your data is not coupled, it can be processed in parallel.
Redundancy
Like with all good systems, we might want some redundancy. You can set your messaging service up with retries, dead letter queues or commit offsets (if you are using Kafka). All the while keeping the message safe from being lost until it has been consumed…you wouldn’t want a new user signing up to your website to fail because a message was not processed.
AWS: SNS and SQS
So what is the difference?
SNS
SNS stands for Simple Notification Service. It is your typical Publisher/Subscriber one-to-many service. Messages are published to a Topic and then consumers subscribed to those Topics receive the same message. Think of it like a newspaper being delivered to several different households.
SQS
SQS stands for Simple Queue Service. You set up a queue and add messages to it. Then one of your services reads from the queue and processes the next message. Messages are only read and processed by one source. Think of it like receiving a personal letter in the post.
Setup SQS queue to subscribe to SNS Topic
You can set up queues to subscribe to SNS Topics too and in the example below we’ll go through how to do that.
Time for the fun stuff!
Dependencies:
- Boto3
- Moto
- Pytest
Create a function that receives a message and publishes it to SNS:
"""aws_service_handler.py"""import os
import boto3
def publish_message_to_sns(message: str):
topic_arn = os.environ["sns_topic_arn"]
sns_client = boto3.client(
"sns",
region_name="eu-west-1",
)
message_id = sns_client.publish(
TopicArn=topic_arn,
Message=message,
)
return message_id
In order to publish to a SNS Topic you need the Amazon Resource Name (ARN) of the Topic. Above, we get that from an environment variable called sns_topic_arn
then set up the sns_client
using Boto3 and publish
the message from that client. AWS will return a MessageId
that you can return from the function.
TEST TEST TEST
Now let’s write our test! I’m a strong believer in blogs not just showing you “Show Code” as Software Engineers we need to get into the mentality that we will sometimes be spending more time on testing than we will on writing the implementation.
So let’s test the code above:
"""test_aws_service_handler.py"""import os
import boto3
from moto import mock_snsfrom aws_service_handler import publish_message_to_sns@mock_sns
def test_publish_message_to_sns():
sns_resource = boto3.resource(
"sns",
region_name="eu-west-1"
) topic = sns_resource.create_topic(
Name="test-topic"
)
os.environ["sns_topic_arn"] = topic.arn
test_message = "test_message"
message_id = publish_message_to_sns(test_message)
assert message_id["ResponseMetadata"]["HTTPStatusCode"] == 200
The way to implement Moto is to use decorators that will wrap your test and build AWS infrastructure in memory, which Boto3 will use. In the above test we use mock_sns
to make a SNS resource and create a Topic to publish our message, using our function built earlier.
The MessageId
response will look something like:
{
"MessageId": "7c446cbb-fb6a-4c03-bc0b-ded3641d5579",
"ResponseMetadata": {
"RequestId": "f187a3c1-376f-11df-8963-01868b7c937a",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"server": "amazon.com"
},
"RetryAttempts": 0
}
}
Therefore we can assert that we get a 200 status code (Happy Path :) ) response.
More fun stuff!
Setup a function that will read a message from an SQS queue:
"""aws_service_handler.py"""import os
import boto3def read_from_sqs_queue():
queue_url = os.environ["sqs_queue_url"]
sqs_client = boto3.client(
"sqs",
region_name="eu-west-1",
)
messages = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
)
return messages
Similar to SNS, in this instance we will need a QueueUrl
that points to the queue our sqs_client
will receive messages from. We only want to read one message at a time so we set MaxNumberOfMessages
to 1, and return our messages.
TEST TEST TEST…again!
This test is going to be slightly more complex.
Like the last test we are going to create a SNS Topic but we are also going to create a SQS Queue that subscribes to that Topic. So we need both mock_sns
and mock_sqs
decorators from Moto.
"""test_aws_service_handler.py"""import json
import os
import boto3
from moto import mock_sns, mock_sqsfrom aws_service_handler import read_from_sqs_queue
@mock_sns
@mock_sqs
def test_read_from_sqs_queue():
sns_resource = boto3.resource(
"sns",
region_name="eu-west-1"
)
topic = sns_resource.create_topic(
Name="test-topic"
)
sqs_resource = boto3.resource(
"sqs",
region_name="eu-west-1",
)
queue = sqs_resource.create_queue(
QueueName="test-queue",
)
os.environ["sqs_queue_url"] = queue.url
os.environ["sns_topic_arn"] = topic.arn
topic.subscribe(
Protocol="sqs",
Endpoint=queue.attributes["QueueArn"],
)
test_message = "test_message"
message_id = publish_message_to_sns(test_message)
messages = read_from_sqs_queue()
message_body = json.loads(messages["Messages"][0]["Body"])
assert message_body["MessageId"] == message_id["MessageId"]
assert message_body["Message"] == test_message
After creating a SNS Resource and SNS Topic, we make a SQS resource and SQS Queue. We then set the environment variables that will be needed in the functions and have SQS subscribe to the SNS Topic, using “sqs” Protocol and setting the Endpoint
to the QueueArn
. Then like the first test, we create a test_message
and publish the message to SNS.
Then we read our messages from the queue using the function we created.
The Messages
object returned from AWS will look (really messy!) similar to:
{
'Messages': [
{
'MessageId': 'b5f1b55e-00b1-3430-fbd8-1c5873369f9d',
'ReceiptHandle': 'wlmpcuuuusmottnwqvbanonfztvgntpsulcrjmyyjxeinlmrkrsaxxnvdhenqfhcbmymmuiepcexwiwfgwbthqhgrkktankjacedfydhkhtekyqmgfclbvecdcuqazvclefxynbnbaockuoksbdmfugqnpwwzjwqqzvofjkavpjtdtydbvlujgkmz',
'MD5OfBody': '4bf2e25ee48ea07fe629c59c056501eb',
'Body': '{\n "Message": "test_message",\n "MessageId": "2ec54c59-e7ed-450b-9de8-1b94980e9d71",\n "Signature": "EXAMPLElDMXvB8r9R83tGoNn0ecwd5UjllzsvSvbItzfaMpN2nk5HVSw7XnOn/49IkxDKz8YrlH2qJXj2iZB0Zo2O71c4qQk1fMUDi3LGpij7RCW7AW9vYYsSqIKRnFS94ilu7NFhUzLiieYr4BKHpdTmdD6c0esKEYBpabxDSc=",\n "SignatureVersion": "1",\n "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",\n "Subject": "my subject",\n "Timestamp": "2020-06-28T01:07:17.218Z",\n "TopicArn": "arn:aws:sns:eu-west-1:123456789012:test-topic",\n "Type": "Notification",\n "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:some-topic:2bcfbf39-05c3-41de-beaa-fcfcc21c8f55"\n}',
'Attributes': {
'SenderId': 'AIDAIT2UOQQY3AUEKVGXU',
'SentTimestamp': '1593306437219',
'ApproximateReceiveCount': '1',
'ApproximateFirstReceiveTimestamp': '1593306437233'
}
}
],
'ResponseMetadata': {
'RequestId': 'CXIXFV0VWIW73I4K5FDTO64LOOWDQNHSI5BNHZQSLWKT8IG2J4YK',
'HTTPStatusCode': 200,
'HTTPHeaders': {
'server': 'amazon.com',
'x-amzn-requestid': 'CXIXFV0VWIW73I4K5FDTO64LOOWDQNHSI5BNHZQSLWKT8IG2J4YK',
'x-amz-crc32': '3042017029'
},
'RetryAttempts': 0
}
}
There are two things we really want to test here:
- The
MessageId
is the same ID that we received from SNS for publishing the message. - The
Message
in theBody
itself is the same message we published to SNS.
In the above Messages
object you can see in the Body
that our test_message has been read! So the only thing left to do is load the JSON string and assert against the two criteria!
Simple as that!
We have successfully created an SNS Topic, created an SQS Queue, subscribed to the Topic, published a message to the Topic and read the message from our SQS Queue.
Lovely Jubbly!
I hope this has been of some use. If you have any questions please let me know. See you on the next one!
Documentation: