Creating an Automated Email Monitoring System using AWS SNS, a Lambda and a Dynamo DB Table

Abhisek Roy
Credit Saison (India)
8 min readApr 16, 2023

In case you use AWS for your cloud infra, there’s a high possibility that you use AWS SES for sending emails to your customers. These emails can be marketing emails or general communication regarding your services. In that case, you need to be aware of the reputation metrics that are tracked continuously–

“Amazon SES actively tracks several metrics that may cause your reputation as a sender to be damaged, or that could cause your email delivery rates to decline. Two important metrics that we consider in this process are the bounce and complaint rates for your account. If the bounce or complaint rates for your account are too high, we might place your account under review or pause your account’s ability to send an email.”

You can read more about AWS SES reputation metrics and the pitfalls of having a high bounce rate here. But what if you could monitor each email that you sent? What if you could save the result of each email that you tried to send, be it a success, bounce, or complaint? To know more, read on…

Fig: SES Feedback notification Settings

The main change you need to make in your AWS SES, is to add feedback notifications for all your verified identities. This would send notifications to the SNS topic of your choice for every email that you send using each identity. Below we have chosen an SNS topic called email_monitor for bounce, complaint or delivery feedback.

Fig: SES Feedback notification Settings

This SNS topic will be the first resource in the system that we will build to capture the status of every email sent from your AWS SES.

Architecture

The architecture here is simple. The email service SES sends notifications to AWS SNS, which uses a lambda subscriber. This lambda pushes the notification to a dynamo DB.

Fig: Setting up a data pipeline to capture email statuses

However, there can be 2 variations of this infrastructure. Specifically, for those who may have set up their SES a while back when it wasn’t available in all AWS regions– they may want to stream the notifications from one region to another.

So with little ado, let’s go over the Cloudformation templates for each resource one by one-

The SNS Topic

The template below has 3 resources, let’s look at them one by one. First, we the SNS topic which will receive notifications from the SES and relay them to a lambda.

Then we have a resource of type “AWS::Lambda::Permission” which essentially allows the SNS topic to invoke the lambda. The last one or the topic policy determines which resources can push to your SNS topic. While we can use this as a base, feel free to add on, or edit certain parameters, as required.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
SNS for getting feedback notifications from SES

Parameters:
TopicName:
Type: String
Description: Topic name of SNS
EmailBounceMonitor:
Type: String
Description: Lambda function to which the SNS sends notifications too

Resources:
Topic:
Type: AWS::SNS::Topic
Properties:
DisplayName: !Ref TopicName
TopicName: !Ref TopicName
Subscription:
- Protocol: lambda
Endpoint: !Ref EmailBounceMonitor


FunctionInvokePermission:
Type: 'AWS::Lambda::Permission'
Properties:
Action: 'lambda:InvokeFunction'
FunctionName: !Ref EmailBounceMonitor
Principal: sns.amazonaws.com

TopicPolicy:
Type: 'AWS::SNS::TopicPolicy'
Properties:
Topics:
- !Ref Topic
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'sns:Publish'
Resource: !Ref Topic
Principal:
AWS: '*'
Condition:
ArnLike:
AWS:SourceArn: !Sub 'arn:aws:*:*:${AWS::AccountId}:*'

The Lambda

The JSON received by your lambda in the events parameter will look somewhat like the one below. It can differ slightly based on the type- delivery, complaint or bounce.

{
'Records': [
{
'EventSource': 'aws:sns',
'EventVersion': '1.0',
'EventSubscriptionArn': '####',
'Sns': {
'Type': 'Notification',
'MessageId': 'asd2282baa-66b2-50c4-a129-77d00ff801cf',
'TopicArn': 'arn:aws:sns:us-east-1:987654321:email_bounce',
'Subject': None,
'Message': '{"notificationType":"Delivery","mail":{"timestamp":"2023-03-02T09:06:49.854Z","source":"complaint@test.xyz","sourceArn":"######","sourceIp":"####","callerIdentity":"#####","sendingAccountId":"####","messageId":"####","destination":["complaint@simulator.amazonses.com"]},"delivery":{"timestamp":"2023-03-02T09:06:50.415Z","processingTimeMillis":561,"recipients":["complaint@simulator.amazonses.com"],"smtpResponse":"250 2.6.0 Message received","remoteMtaIp":"####","reportingMTA":"####"}}',
'Timestamp': '2023-03-02T09:06:50.464Z',
'SignatureVersion': '1',
'Signature': '####',
'SigningCertUrl': '####',
'UnsubscribeUrl': '####',
'MessageAttributes': {
}
}
}
]
}

The SNS topic will relay the email notifications to this lambda, whose code is shown below. This is the code that will handle your notifications, format them, and store the correct data in your Dynamo DB. We preferably want to extract and store only 4 data points, the recipient, the sender, the time it was sent, and the status of the email.

1. import logging
2. import json
3. import os
4. import boto3
5. from botocore.exceptions import ClientError
6.
7. logger = logging.getLogger()
8. logger.setLevel(logging.DEBUG)
9.
10. BOUNCE_TABLE_NAME = os.getenv('BOUNCE_TABLE_NAME')
11. dynamo_db = boto3.resource("dynamodb")
12. RETRY_COUNT = 4
13.
14. def create_ddb_entry(id, payload):
15. ddb_entry = {
16. "user_id": id,
17. "notification_type": payload.get("notificationType"),
18. "from": payload.get("mail").get("source"),
19. "timestamp": payload.get("mail").get("timestamp")
20. }
21. return ddb_entry
22.
23. def write_to_table(id, message):
24. retries = 0
25. added_email_entry = False
26. dynamo_table = dynamo_db.Table(BOUNCE_TABLE_NAME)
27. ddb_data= create_ddb_entry(id, message)
28. logger.info("Writing to Dynamo DB Table")
29. while retries < RETRY_COUNT:
30. try:
31. dynamo_table.put_item(Item=ddb_data)
32. added_email_entry = True
33. break
34. except ClientError as e:
35. retries += 1
36. if added_email_entry:
37. logger.info("Added record")
38. else:
39. logger.info("Failed to add record %s", ddb_data)
40.
41. def handle_bounce(message):
42. messageId = message.get("mail").get("messageId")
43. bounceType = message.get("bounce").get("bounceType")
44. addresses = [recipient.get("emailAddress") for recipient in message.get("bounce").get("bouncedRecipients")]
45. logger.info("Message " + messageId + " bounced when sending to " + ', '.join(addresses) + ". Bounce type: " + bounceType)
46. for address in addresses:
47. logger.info("Writing to table with params %s, %s, %s", address, message, "disable")
48. write_to_table(address, message)
49.
50. def handle_complaint(message):
51. messageId = message.get("mail").get("messageId")
52. addresses = [recipient.get("emailAddress") for recipient in message.get("complaint").get("complainedRecipients")]
53. logger.info("A complaint was reported by " + ', '.join(addresses) + " for message " + messageId + ".")
54. for address in addresses:
55. write_to_table(address, message)
56.
57. def handle_delivery(message):
58. messageId = message.get("mail").get("messageId")
59. deliveryTimestamp = message.get("delivery").get("timestamp")
60. addresses = message.get("delivery").get("recipients")
61.
62. logger.info("Message " + messageId + " was delivered successfully at " + deliveryTimestamp + ".")
63.
64. for address in addresses:
65. write_to_table(address, message)
66.
67. def lambda_handler(event, context):
68. logger.info("Recording with event %s", event)
69. message = json.loads(event.get("Records")[0].get("Sns").get("Message"))
70. logger.info("The Message is %s", message)
71. if message.get("notificationType") == "Bounce":
72. logger.info("Bounce")
73. handle_bounce(message)
74. elif message.get("notificationType") == "Complaint":
75. logger.info("Complaint")
76. handle_complaint(message)
77. elif message.get("notificationType") == "Delivery":
78. logger.info("Delivery Success")
79. handle_delivery(message)
80. else:
81. logger.info("Unknown notification type")
82. return True

Along with the lambda code, the template is also important. We have shared the SAM template below, and you can see that we use the DynamoTable name and the Dynamo Table ARN in this template. The reason behind this is that we will need to know the table to which we need to write data to and the ARN for which it needs “dynamodb:PutItem” functionality. It is always better to specify the exact resource and permission you use for IAM roles.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
EmailBounceMonitor

Sample SAM Template for EmailBounceMonitor

Globals:
Function:
Timeout: 300
MemorySize: 256


Parameters:
DynamoTableName:
Type: String
Description: Name of the Dynamo DB Table
DynamoTableArn:
Type: String
Description: ARN of the Dynamo DB Table

Resources:
EmailBounceMonitorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: email_bounce_monitor/
Handler: app.lambda_handler
Runtime: python3.7
Policies:
- AWSLambdaBasicExecutionRole
- Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource:
- !Ref DynamoTableArn
Environment:
Variables:
BOUNCE_TABLE_NAME: !Ref DynamoTableName


Outputs:
EmailBounceMonitorFunction:
Description: "EmailBounceMonitor Lambda"
Value: !GetAtt EmailBounceMonitorFunction.Arn

The Dynamo DB

You would realise from this template that we are using the user_id as the hash key and timestamp as the sort key where user_id is the id to which the mail was sent and timestamp is the time at which it was sent.

The only extra part in this template is the KinesisStreamSpecification which can be omitted. If you want to understand more about it, you can read it here. In short, it pushes any data changes/additions in the dynamo db to the data lake. Billing mode, PITR are set based on our use case, and you may change it to suit your needs.

---
AWSTemplateFormatVersion: 2010-09-09

Parameters:
TableName:
Type: String
Description: Lambda used to emit notifications for Aquaman
KinesisStream:
Type: String
Description: Kinesis Stream

Resources:
Table:
Type: "AWS::DynamoDB::Table"
DeletionPolicy: Retain
Properties:
BillingMode: PAY_PER_REQUEST
KinesisStreamSpecification:
StreamArn: !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStream}'
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
AttributeDefinitions:
- AttributeName: "user_id"
AttributeType: "S"
- AttributeName: "timestamp"
AttributeType: S
KeySchema:
- AttributeName: "user_id"
KeyType: "HASH"
- AttributeName: timestamp
KeyType: RANGE
TableName: !Ref TableName


Outputs:
TableName:
Value: !Ref Table
TableArn:
Value: !GetAtt Table.Arn

The table below shows some data that we collected while testing, and this is how your Dynamo DB table should look once you have the data flowing in the correct format.

Fig: The Dynamo DB Table containing email metrics

Tieing the pieces together

While I gave you the individual pieces of the puzzle, this master template should help you piece together the items.

---
AWSTemplateFormatVersion: 2010-09-09


Resources:

emailbouncemonitor:
Type: AWS::CloudFormation::Stack
Properties:
Parameters:
DynamoTableName: !GetAtt [ "emailbouncetable", "Outputs.TableName" ]
DynamoTableArn: !GetAtt [ "emailbouncetable", "Outputs.TableArn" ]
TemplateURL: Lambdas/EmailBounceMonitor/package.yaml

emailbouncetable:
Type: AWS::CloudFormation::Stack
Properties:
Parameters:
TableName: 'email_monitor'
KinesisStream : !ImportValue DynamoKinesisStream
TemplateURL: AWSResources/02_dynamo_db.yaml

emailbouncetopic:
Type: AWS::CloudFormation::Stack
Properties:
Parameters:
TopicName: 'email_monitor'
EmailBounceMonitor: !GetAtt [ "emailbouncemonitor", "Outputs.EmailBounceMonitorFunction" ]
TemplateURL: AWSResources/01_sns_topic.yaml

In case you are choosing the 2nd diagram from the architecture that we shared above, you will need to break this into 2 and deploy the two pieces separately.

The first one containing just the Dynamo DB table, that is to be deployed in the zone where you have all your other infrastructure, and the second one having the SNS and the Lambda in the zone where you have the SES. In that case, remember to give the correct IAM role so that the lambda can access the Dynamo DB table across zones, and also, don’t forget to add the region in the 11th line of the code where you create a boto3 resource for Dynamo DB in this way-

dynamo_db = boto3.resource("dynamodb", region_name='ap-south-1')

To Conclude

I used bits and pieces from this article along with some AWS resources to build what I have shown above, so you can refer to this as well– specially if you want to create your lambda in JavaScript. But no matter what language you code, write clean code, and build beautiful infra!

--

--