Processing High Volume Big Data Concurrently with No Duplicates using AWS SQS

In this story, we’ll be looking at how one could leverage AWS Simple Queue Service (Standard queue) to achieve high concurrency while processing with no duplicates. Also we compare it with other AWS services like DynamoDB, SQS FIFO queue and Kinesis in terms of cost and performance.

Engineering@ZenOfAI
Nov 6, 2019 · 5 min read

A simple use case for the below architecture could be building an end-end messaging service or sending out transactional emails. In both the use cases, a highly concurrent processing with no duplicates is needed.

So, we have a Lambda function that writes messages to the Standard queue in batches. This writer function is invoked when a file is posted to S3. While there are messages in the queue, Lambda polls the queue, reads messages in batches and invokes the Processor function synchronously with an event that contains queue messages. The processing function is invoked once for each batch. When the function successfully processes a batch, Lambda deletes its messages from the queue. If at all the function fails processing(it raises an error) the batch is put back in the queue. Now, the Standard queue is configured with redrive policy to move messages to a Dead Letter Queue (DLQ) when receive request reaches the Maximum receive count(MRC). We set the MRC to 1 to ensure deduplication.

Setting up Standard Queue with Dead Letter Queue

We need two queues one for processing, second for moving failed messages into it. First create the failed_messages queue. As it is needed while creating the message processing queue. Create a new queue, give it a name (failed_messages), select type as Standard and choose Configure Queue.

According to the needs, set the queue attributes like visibility timeout, message retention period etc.

For processing messages, Create a new queue, give it a name, select type as standard and choose Configure Queue.

Set the Default Visibility Timeout to 5min and Dead Letter Queue Settings to setup the redrive policy to move failed messages into failed_messages queue created earlier.

From the SQS homepage, select processing queue, and select Redrive Policy, If setup correctly you should see the ARN of failed_messages queue there.

Creating the Writer and Processor lambda functions:

Writer.py

# Write batch messages to queue
import csv
import boto3
s3 = boto3.resource('s3')# Update this dummy URL
processing_queue_url = ""
def lambda_handler(event, context):
try:
if 'Records' in event:
bucket_name = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
bucket = s3.Bucket(bucket_name)
obj = bucket.Object(key=key)
# get the object
response = obj.get()['Body'].read().decode('utf-8').split('\n')
resp = list(response)
if resp[-1] == '':
#removing header metadata and extra newline
total_records = len(resp) - 2
else:
#removing header metadata
total_records = len(resp) - 1
print("total record count is :", total_records)
batch_size = 0
record_count = 0
messages = []
# Write to SQS
for row in csv.DictReader(response):
record_count += 1
record = {}
for k,v in row.items():
record[k] = v
# Replace below with appropriate column with all values as unique
unique_id = record['ANY_COLUMN_WITH_ALL_VALUES_UNIQUE']

batch_size += 1
messages.append(
{
'Id': unique_id,
'MessageBody': json.dumps(record)
})

if (batch_size == 10):
batch_size = 0
try:
response = sqs.send_message_batch(
QueueUrl = processing_queue_url,
Entries = messages
)
print("response:", response)
if 'Failed' in response:
print('failed_count:', len(response['Failed']))
except Exception as e:
print("error:",e)
messages = []

# Handling last batch
if(record_count == total_records):
print("batch size is :", batch_size)
batch_size = 0
try:
response = sqs.send_message_batch(
QueueUrl = processing_queue_url,
Entries = messages
)
print("response:", response)
if 'Failed' in response:
print('failed count is :', len(response['Failed']))
except Exception as e:
print("error:",e)
messages = []

print('record count is :', record_count)
except Exception as e:
return e

Processor.py

# Process queue messagesdef handler(event, context):
if 'Records' in event:
try:
messages = event['Records']
for message in messages:
print("message to be processed :", message)

result = message['body']
result = json.loads(result)
print("result:",result)
return {
'statusCode': 200,
'body': 'All messages processed successfully'
}
except Exception as e:
print(e)
return str(e)

Setting up S3 as trigger to Writer lambda

Setting up SQS trigger to processor Lambda

If set up properly, you should be able to view it in Lambda Triggers section from the SQS homepage like this.

The setup is done. To test this upload a .csv file to the S3 location.

SQS Standard Queue in comparison with FIFO queue

FIFO queue in SQS supports deduplication in two ways:

  1. Content based deduplication while writing to SQS.
  2. Processing one record/batch at a time.

Unlike Standard Queue, FIFO doesn’t support concurrency and lambda invocation. On top of all this there is a limit to how many messages you could write to FIFO queue in a second. FIFO queues are much suited when the order of processing is important.

Cost analysis:
First 1 million Amazon SQS requests are free each month.

More on SQS pricing .

SQS Standard Queue in comparison with DynamoDB

DynamoDB streams are slow when compared SQS, and costs on various aspects like:

  1. Data Storage
  2. Writes
  3. Reads
  4. Provisioned throughput
  5. Reserved capacity
  6. Indexed data storage
  7. Streams and many more.

In a nutshell, DynamoDB’s monthly cost is dictated by data storage, writes and reads. The best use cases for DynamoDB are those that require a flexible data model, reliable performance, and the automatic scaling of throughput capacity.

SQS Standard Queue in comparison with Kinesis

Kinesis primary use case is collecting, storing and processing real-time continuous data streams. Kinesis is designed for large scale data ingestion and processing, with the ability to maximise write throughput for large volumes of data.

While a message queue makes it easy to decouple and scale micro-services, distributed systems, and serverless applications. Using a queue, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. In a nutshell, Serverless applications are built using micro services, message queue serves as a reliable plumbing.

Drawbacks of Kinesis:

  1. Shard management
  2. Limited Read Throughput

For a much detailed comparison of SQS and Kinesis visit .

Thanks for the read, I hope it was helpful.

This story is authored by Koushik. He is a software engineer and a keen data science and machine learning enthusiast.


Originally published at .

ZenOf.AI

AI | Machine Learning | Big Data

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade