AWS SQS Basic Implementation
The data ingestion team recently had a use case for a new queue driven process. So we decided to give SQS a shot. Our system needed to process a file after it was added to S3, in this case converting it from PDF to JSON. We previously built a similar process as a Lambda function using an S3 trigger, however this time around we wanted more flexibility. So we decided to build an endpoint that any client could hit with a source PDF. This would make our PDF to JSON converter universal allowing us to call it from any app that needed a conversion performed. Initially we wanted to have the endpoint return the resulting JSON directly to the client. However, we quickly realized we wouldn’t be able to do that, because the processing time for larger files was too lengthy. Therefore we decided a queue driven process was in order. As with all infrastructure, we wanted to be able to deploy our queue through code. Terraform was the obvious choice to get started. Once our queue was in place we were able to build apps using Node.js that would interact with the SQS queue. The basic workflow for our use case was as follows:
- Create a FIFO SQS queue using Terraform.
- Create an endpoint that allowed users to place messages into a queue.
- Create a consumer application to process messages off of the queue.
Creating the queue using Terraform:
resource "aws_sqs_queue" "terraform_sample_queue" {
name = "mysamplequeue"
message_retention_seconds = 1209600
receive_wait_time_seconds = 10
fifo_queue = true
content_based_deduplication = true
visibility_timeout_seconds = 43200tags {
Environment = "development"
}
}
Okay, so what does this mean? Without getting into too much detail this code is going to create a queue named “mysamplequeue” in SQS. The message_retention_seconds is used to tell the queue how long an item can stay in the queue before it should be removed automatically. We’re using 14 days in this example which should give us plenty of time to address any problems that may arise without letting the queue grow indefinitely. In order to reduce cost we’re using long polling by setting receive_wait_time_seconds. You can read more about that here if you are interested. The next couple of lines are fairly straightforward… we’re creating a FIFO queue that doesn’t allow duplication. visibility_timeout_seconds here is set to 12 hours. Meaning a message can be in flight for up to 12 hours before SQS treats the item as “lost” and re-queues it. Lastly, we setup any tags we want to have on our queue. All of these settings should be tweaked to fit your needs, and you can read about even more settings you may want to explore in the SQS docs.
Notes: Something to keep in mind is that while testing you probably want to have content_based_deduplication set to false so you don’t have to keep purging the queue if messages you’re testing get lost in flight and you need to re-queue them.
Populating the queue:
In our scenario we wanted to abstract the population of the queue by creating an endpoint that would populate our queue. This means our clients pushing to the queue don’t need to know how to interact with SQS directly, or even know which queue they should populate. They just need to send a web request with some JSON. Our endpoint was built using the express module which you can read more about here.
So the client just sends a JSON object to our endpoint, then we parse fields out of that JSON request in order to build a queue message like so:
const AWS = require('aws-sdk')
AWS.config.update({region: 'US-WEST-2'});const sqs = new AWS.SQS();const params = {
MessageAttributes: {
"originbucket": {
DataType: "String",
StringValue: "myOriginBucket"
},
"originkey": {
DataType: "String",
StringValue: "myOriginKey"
},
"destinationbucket": {
DataType: "String",
StringValue: "myDestBucket"
},
"destinationkey": {
DataType: "String",
StringValue: "myDestKey"
}
},
MessageBody: JSON.stringify("myMessageBody"),
QueueUrl: "https://sqs.us-west-2.amazonaws.com/#/myQueueName",
MessageGroupId: "myMessageGroupId"
};
sqs.sendMessage(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
})
As you can see we import aws-sdk then create a new SQS class which we can use to sendMessage. The only setup we need to do is to build our params. In our use case we are doing some processing on a file that exists in S3 so we add some attributes that allow our consumer app to know where to retrieve the file (originbucket/originkey), and where it should save the output once it’s done processing the item (destinationbucket/destinationkey).
Notes: Don’t forget to set the region see (line 2) or you will most likely encounter errors.
Consuming the queue:
const Consumer = require('sqs-consumer');const app = Consumer.create({
queueUrl: "https://sqs.us-west-2.amazonaws.com/#/myQueueName",
handleMessage: async (message, done) => {
try {
const msg = JSON.parse(message.Body)
// do stuff with msg
done();
}
catch(error) {
console.log(error)
done(error);
}
}
});
As you can see we used the sqs-consumer module which simply requires you to set a queueUrl and then build a function that runs on your message. In our case we built a handler module that we send the message to in place of // do stuff with msg that performs the rest of the processing (i.e. conversion from PDF to JSON) by accessing the fields on our JSON message (originbucket, originkey, destinationbucket, and destinationkey). Once you are done processing the item you just need to call done(); and the item will be deleted from the queue. Until done(); is called the message is considered to be “in flight”. Calling done(error); will cause the item to be placed back onto the queue rather than being removed or remaining in flight. It isn’t shown here, but when this happens we are actually sending a slack notification so we are aware of the issue and can look into it.