Implementing SQS in Serverless….the right way

At the end of June 2018 AWS announced the launch of SQS triggers for lambda functions. This is a gamechanger those of us who use the Serverless framework for creating microservices.

I maintain a few application stacks in Serverless including a stateless payment gateway.

The intention of this post is to help you to deploy a fresh queue into a microservice including a HTTP handler to let other services publish messages into the service while preserving the bounded context.

If you have not yet created a serverless project feel free to use my Generic Microservice Chassis to help you get started. The readme covers some high level best practises for deploying your microservice.

The rest of this documentation assumes you understand the basic structure outlined in the chassis readme.

What do I mean by the right way?

The design laid out in the abovementioned chassis is crafted to make for a seamless CI/CD integration. The most important part of this is letting CloudFormation do the work it needs to do. The chassis includes references to a local environment file that is designed to handle Parameter Store secrets. Keep the keys of the param store’s consistent across your different environments and you have no touch CI out of the box!

Architectural Guidelines

  • Do not allow other services to directly communicate with the SQS queue. To preserve the bounded context any inter-service traffic must hit a HTTP endpoint and be forwarded to the queue internally. Return a standard pending async response object.
  • Use the deadletter queue to handle transactions that fail a specificed number of times.
  • This walk through does not cover AAA as there are several different ways to authenticate service access and I will cover my approaches in a future post.

Getting started with your Queue

We will pretend we are creating a service stream inside a microservice. Thus we will need:

  • A queue resource
  • Plumbing to extract the address of the queue and inject it into the code modules.
  • A HTTP endpoint for external services to push messages into the microservice
  • An incoming handler to push messages onto the queue
  • A queue handler to process the messages on the queue

1 — Create and include SQS Resource

  • Create a queue resource declaration in the configurators folder (in its initial configuration it doesnt need to set any values but the resource cannot be null so add a reference to a default value for now.)

./configurators/todoServiceStream.yaml

---
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 345600
This is just a shell, you will likely add more configuration here. Note that we have chosen not to name the resource to let cloudformation do so.

2 — add reference to resource

./serverless.yaml

resources:
Resources:
TodoServiceQueue:
$ref: ./configurators/todoServiceQueue.yaml
you COULD simply create the resource from step 1 in serverless.yaml if you like but when your stacks get bigger this segmentation is preferable. As we have not named the resource you can refer to it by its key from the resource block (do this for all your resources for better CI/CD compatibility).

3 — export the stream URL from the resource ref

  • This will be used as an envar in the handler/library that wants to communicate with the queue

./serverless.yaml

environment:
DEPLOY_REGION: ${opt:region, self:provider.region}
API_ROOT: ${self:custom.domainName}
TODO_QUEUE_URL:
Ref: TodoServiceQueue
notice that we are using the key from the resources block of serverless to reference the Ref.

4 — create a stream handler event

./serverless.yaml

todoIngestHandler:
handler: handler.todoIngestHandler
name: ${self:provider.stage}-${self:custom.grub}-todoIngestHandler
desciption: hash tag service transport QUEUE
events:
- sqs:
arn:
Fn::GetAtt: [ TodoServiceQueue, Arn ]
the Fn::GetAtt intrinsic function is retrieving the ARN of the queue from its resource. This way we dont need to pull account ids and assembly arns. Again this is CI/CD friendly

5 — create the external handler reference

./handler.js

/**
* SQS worker function listening to the hash tag service queue
* @param event
* @param context
* @param cb
*/
export const todoIngestHandler = ( event, context, cb ) => {
logger.info( 'message received from SQS queue : ', event );
todoServer.todoIngestHandler( event, db )
.then( workerResponse => {
logger.info( 'success posting event: ', workerResponse );
resolve( null, null );
})
.catch( err => {
logger.error( 'error handling queue event : ', err );
return cb( err )
})
}; // end todoIngestHandler

note the following:

  • this just passes the whole event in. probably extract and parse the records
  • return the error if an error occurs, this will ultimately trigger the dead letter queue when configured
  • dont return anything on succeess, the logging here is just for development

6 — create the server library function processing entry point

./lib/todo.server.library.js

/**
* SQS worker server function. empty for now.
* @param queueEvents
* @param db
* @returns {Promise<any>}
*/
export async function todoIngestHandler( queueEvents, db ) {
return new Promise(( resolve, reject ) => {
logger.info( 'inside the library todo func: ', queueEvents );
resolve({ status: "OK"})
})
} // end todoIngestHandler
  • now everything is wired up, this function is the entry point for handling the queue messages.
  • do this for any serverless function (ie: the handler just passes in the event and eventual target to to the server library, which then calls factories etc).

7 — OPTIONAL create a HTTP endpoint to push messages onto the queue

./serverless.yaml

submitTodoRequest:
handler:
handler.submitTodoRequest
name: ${self:provider.stage}-${self:custom.grub}-submitTodoRequest
description: http endpoint for other services to push tag event to service queue
events:
- http:
path:
/
method: post

./handler.js

/**
* HTTP endpoint that takes various hash tag service payloads and
* injects that onto the service stream for processing.
* @param event
* @param context
* @param cb
*/
export const submitTodoRequest = ( event, context, cb ) => {
logger.info( 'event passed into submitTodoRequest' , event );
todoServer.submitTodoRequest( JSON.parse( event.body ), Queue )
.then( resultObj => {
logger.info( 'success putting message on queue :', resultObj );
return cb( null, RESifySuccess( resultObj ));
})
.catch( err => {
logger.error( 'passing events to service queue failed: ', err );
return cb( null, RESifyErr( err ));
})
}; // end submitTodoRequest
  • in this case we are RESifying the response and error and returning them as they are going back to a client app or different service.
  • we are parsing the body from the request and passing it into the server. this is standard for a basic endpoint at its creation however for more complex services you may need to create an assembly to pass in. (ie: including path params, the body or query string params).

8 — create server function to handle incoming message to queue

this is the server function that will wrangle the incoming message onto the queue.

./lib/todo.server.library.js

const HASH_TAG_QUEUE_URL = process.env.HASH_TAG_QUEUE_URL;
/**
* Takes candidate request, parses and puts it on service queue
* @param candidateAssembly
* @param Queue
* @returns {Promise<any>}
*/
export async function submitTodoRequest( candidateTodo, Queue ) {
return new Promise(( resolve, reject ) => {
if( typeof candidateTodo === "object") {
candidateTodo = JSON.stringify( candidateTodo );
}
const queuePayload = {
MessageBody: candidateTodo,
QueueUrl: HASH_TAG_QUEUE_URL
};
logger.info( 'payload to push onto queue : ', queuePayload );
Queue.sendMessage( queuePayload ).promise()
.then( queueAck => {
logger.info( 'queue ack : ', queueAck );
resolve( queueAck.MessageId );
})
.catch( err => {
logger.error( 'erorr in pushing payload onto queue ', err );
reject( err );
})
})
} // end submitHashTagRequest
  • it may seem strange that we have parsed the payload in the handler and then stringified it here. not that this is only because in this example we are not perfomring any actions on the payload. A complete example will probably validate the incoming payload before pushign to the queue so a proper error can be returned to the client prior to attempting the process. Once again this will be covered in an article that describes handing events but always parse objects in the handler, this increases portability until the cloud event standard reaches V1.
  • the HASH_TAG_QUEUE_URL has been imported as an envar, this was created in step 3.

Final Thoughts

You should now have a basic working SQS queue within your service. You can use the queue URL exported in step 3 by any functions inside the service that need to push messages onto the queue. There is a lot more configuration that you can go on to do but this is enough to get you going.

have a great day!