Deduplicating Amazon SQS Messages

Processing exactly-once between multiple Lambda consumers

Being relatively new to serverless, I find myself facing interesting problems to solve all the time working with Amazon Web Services (AWS). A recent issue of particular interest to me took the form of message deduplication in Amazon’s Simple Queue Service (SQS).

We have a standard SQS queue, automatically triggering a Lambda function to consume messages. This function scales automatically— increasing and decreasing its number of concurrent executions — to handle the number of messages sitting in the queue.

Standard queues guarantee at-least-once delivery, storing messages across multiple servers to maintain high availability. We may find on the rare occasion that one of these servers will be unavailable at the time our message is consumed or deleted. Subsequently the server, when available again, may re-send the message to the Lambda function, thereby delivering a duplicate.

Suppose these messages represent appointment requests within a GP practice, where the Lambda function takes a request and allocates an appointment. We can be sure that every request will be allocated at least one appointment, but we need to prevent scheduling two, three, or more appointments to the same request. So how do we do this?

No to FIFO Queues

Fear not, I hear you cheer, for SQS First-In-First-Out (FIFO) queues are here! Guaranteeing exactly-once delivery, on face value this sounds like the ideal out-the-box solution to put our minds at ease. If only it were that simple …

Using a FIFO queue, we cannot trigger our Lambda function automatically. We would need CloudWatch events to periodically trigger the Lambda, polling the FIFO queue for available messages. These events run at most once a minute. Rather than following an event-driven approach, we would find ourselves repeatedly checking this queue in the hope of messages to consume.

We can implement long polling into the Lambda to mitigate this, waiting up to 20 seconds for a message to arrive in the SQS queue. This will help, but it doesn’t eliminate the possibility that our Lambda will consume nothing, and we will be charged for the full execution time.

Furthermore, the CloudWatch event would trigger only one execution of our Lambda function. This leaves us unable to take advantage of Lambda’s horizontal scaling to process messages concurrently.

So when should we expect FIFO queues to be added to the list of supported Lambda triggers, I hear you ask? Probably never, largely down to the primary purpose of FIFO: first in, first out.

FIFO queues maintain the ordering of our messages. As soon as multiple messages are consumed by concurrent executions of the Lambda function, we only need one execution to fail and return a message to the queue for our ordering to fall out of sync.

Why use SQS?

With neither standard nor FIFO queues proving optimal for the task at hand, you may ask, why bother with SQS at all? Surely there’s alternative services to consider? Amazon MQ, Kinesis data streams, or the Simple Notification Service (SNS), to name a few. So why are we not using them instead?

Short answer: none of them guarantee exactly-once delivery, leaving us precisely where we were before. The deduplication issue is not unique to SQS. There’s two core reasons to stick to SQS for this particular task: to buffer messages reaching our Lambda — since we may well wish to cap the number of concurrent executions — and take advantage of SQS’s dead letter queues in the event messages fail to process one or more times.

With our hearts set on SQS, how do we solve the deduplication issue? FIFO queues would work, but we’re failing to take advantage of concurrent executions, and we may find ourselves billed for a large duration of unnecessary execution time when we poll for messages to consume.

Solution

With no out-the-box solution on the queue front, responsibility lies on the shoulders of our Lambda to make sure every message is delivered exactly once. This entails further logic in the function to validate every message consumed, making sure another execution hasn’t already consumed and processed the same message.

We require a ledger to keep a record of all messages consumed — establishing which are in progress, and which are complete. Sticking to the serverless, scalable theme, a DynamoDB NoSQL table is ideal for the job.

We’ll want to record several details in this table:

  1. a unique ID message_id included in each message,
  2. the message status (either IN_PROGRESS or COMPLETE),
  3. a consumption_count to track how often the message is consumed, and
  4. an updated timestamp to indicate when this item was last updated.

Whenever the Lambda consumes a message, we’ll take message_id, and query the DynamoDB table with a strongly consistent GET operation for any items which include this ID.

If the GET operation returns an item, we know this message has been consumed at least once before. This will occur for one of two reasons: either

  1. a previous Lambda execution failed to process the message, or
  2. SQS has duplicated the message.

To make sure another Lambda execution is not processing the same message, we need to check the corresponding table item’s updated timestamp against the current system time. If the difference between these times exceeds the Lambda timeout, then we know we’re safe to process the message. Otherwise, the Lambda should raise an error, returning the message to the SQS queue.

When it’s returned to the queue, how do we prevent another execution immediately picking this message up to make the same check, I hear you ask? To address this, we need to configure the queue’s visibility timeout to match the Lambda timeout.

In adjusting the visibility timeout, the message remains invisible after consumption for the full possible lifetime of a Lambda execution. This means next time the message is available, there should not be any ongoing executions processing the same message.

If the message can be processed, we need to put multiple attributes into the corresponding DynamoDB table item: the updated timestamp set to the current system time, the consumption_count incremented by 1 (starting from 0), and the message status set to IN_PROGRESS.

If the Lambda then succeeds in processing the message, we would make a further update to the table item — setting the message status to COMPLETE and updating the updated timestamp to the current system time. Going forward, any executions attempting to process the same message further down the line will stop after reading the COMPLETE status.

Conclusion

There’s a vast amount of logic packed into the consumer. Not ideal, particularly if you intended to keep your consumer a “dumb” service — simply sending a message to another destination. In the end, if the cost of duplicate messages would be large, then the time and effort dedicated to implementing this logic will be worthwhile.

If you fancy following an implementation of the process outlined here in this blog post, I’ve shared a Lambda function on GitLab to showcase this process. Written in Python, this code reads from and writes to DynamoDB using the boto3 client. The DynamoDB table, Lambda function, and SQS queue are built and deployed with the AWS Cloud Development Kit (v0.33.0).

I’d love to hear how others have tackled this problem. Is there an out-the-box solution hiding in the shadows? Are there alternative solutions out there proving effective for your teams? Are you also following this solution? Hope the content outlined here proves useful to others facing the same problem.


UPDATE (June 15th 2019): Tony Brown (triangulum_au) has rightly pointed out on Twitter we should use DynamoDB’s ConditionExpressions to address any possible race conditions between our GET and PUT operations. I’ll be making amendments to this post and the accompanying GitLab example to include this.