How to build a fully managed scheduling mechanism for updates on Amazon S3 data lakes
Here is a break-down of our infrastructure:
- Scheduling trigger — New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
- Task scheduling — As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
- Task execution trigger — When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
- Task execution — The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
- Data usage — The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.
Project Prerequisites:
1 Install aws cli version 2 on Mac
To install and update for all users using the macOS command line
$ curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 21.3M 100 21.3M 0 0 10.0M 0 0:00:02 0:00:02 --:--:-- 10.0M$ sudo installer -pkg AWSCLIV2.pkg -target /
Password:
installer: Package name is AWS Command Line Interface
installer: Upgrading at base path /
installer: The upgrade was successful.
Verify installation
$ which aws
/usr/local/bin/aws$ aws --version
aws-cli/2.0.56 Python/3.7.4 Darwin/19.6.0 exe/x86_64
Set up aws configure
$ aws congfigure
AWS Access Key ID [****************DOWD]:
AWS Secret Access Key [****************f3ul]:
Default region name [us-east-1]:
Default output format [json]:
For other platforms, please visit here.
2 Install Python, Pip, Boto3
Homebrew installed for Mac installation (Chocolatey if using Windows)
Installing Homebrew
Open terminal and type in
$ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
.
.
.
==> Installation successful!==> Homebrew has enabled anonymous aggregate formulae and cask analytics.
Read the analytics documentation (and how to opt-out) here:
https://docs.brew.sh/Analytics
No analytics data has been sent yet (or will be during this `install` run).==> Homebrew is run entirely by unpaid volunteers. Please consider donating:
https://github.com/Homebrew/brew#donations==> Next steps:
- Run `brew help` to get started
- Further documentation:
https://docs.brew.sh
Verify Homebrew installation
$ brew --version
Homebrew 2.4.16
Homebrew/homebrew-core (git revision 23bea; last commit 2020-09-04)
Homebrew/homebrew-cask (git revision 5beb1; last commit 2020-09-05)
Once you’ve installed Homebrew, insert the Homebrew directory at the top of your PATH
environment variable. You can do this by adding the following line at the bottom of your ~/.profile
file
export PATH="/usr/local/opt/python/libexec/bin:$PATH"
2. Python (the version must be 3.7 to make sure that all functions will be available to present intended outcome) is required to be installed
Installing Python3
Now, we can install Python 3:
$ brew install python
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 1 tap (homebrew/cask).
==> Updated Casks
sessionWarning: python@3.8 3.8.5 is already installed and up-to-date
### my python was preinstalled, you may see different installation process. And it may take a while before python is fully installed
Verify is your python3 is installed
$ python3 --version
Python 3.8.5
Notes: you may set your default python as latest version by applying following code
$ unlink /usr/local/bin/python
$ ln -s /usr/local/bin/python3.8 /usr/local/bin/python
Installing Pip
$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1841k 100 1841k 0 0 2989k 0 --:--:-- --:--:-- --:--:-- 2984k$ python get-pip.py
/usr/local/lib/python3.8/site-packages/setuptools/distutils_patch.py:25: UserWarning: Distutils was imported before Setuptools. This usage is discouraged and may exhibit undesirable behaviors or errors. Please use Setuptools' objects directly or at least import Setuptools first.
warnings.warn(
Collecting pip
Using cached pip-20.2.3-py2.py3-none-any.whl (1.5 MB)
Installing collected packages: pip
Attempting uninstall: pip
Found existing installation: pip 20.2.3
Uninstalling pip-20.2.3:
Successfully uninstalled pip-20.2.3
Successfully installed pip-20.2.3
Verify installation
$ pip --version
pip 20.2.3 from /usr/local/lib/python3.8/site-packages/pip (python 3.8)
Installing Boto3
$ python -m pip install boto3
Requirement already satisfied: boto3 in /usr/local/lib/python3.8/site-packages (1.15.14)
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /usr/local/lib/python3.8/site-packages (from boto3) (0.10.0)
Requirement already satisfied: s3transfer<0.4.0,>=0.3.0 in /usr/local/lib/python3.8/site-packages (from boto3) (0.3.3)
Requirement already satisfied: botocore<1.19.0,>=1.18.14 in /usr/local/lib/python3.8/site-packages (from boto3) (1.18.14)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /usr/local/lib/python3.8/site-packages (from botocore<1.19.0,>=1.18.14->boto3) (2.8.1)
Requirement already satisfied: urllib3<1.26,>=1.20; python_version != "3.4" in /usr/local/lib/python3.8/site-packages (from botocore<1.19.0,>=1.18.14->boto3) (1.25.10)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.8/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.19.0,>=1.18.14->boto3) (1.15.0)
Verify installation
$ pip show boto3
Name: boto3
Version: 1.15.14
Summary: The AWS SDK for Python
Home-page: https://github.com/boto/boto3
Author: Amazon Web Services
Author-email: UNKNOWN
License: Apache License 2.0
Location: /usr/local/lib/python3.8/site-packages
Requires: botocore, jmespath, s3transfer
Required-by: aws-shell
Before we dive into every component of this project, we would build this infrastructure using CloudFormation and test it.
1 Fill in stack name as well as S3 bucket name (has to be globally unique)
2 Move on with default set
3 Check I acknowledge box and create stack
4 Wait until all resources listed below completed
Testing our infrastructure
To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents
and replaces its value with an empty string.
To test our infrastructure, we may upload new data into your S3 bucket mydataset-bucket
under the dataset/
prefix. NotificationFunction
Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction
Lambda function processes the data and you can check the resulting changes via an Athena query.
You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed
tag.
Here I saved mock_uploaded_data.json
in my local environment and uploaded it to S3 bucket named mydataset-bucket
under prefix dataset/
$ aws s3 cp mock_uploaded_data.json s3://mydataset-bucket/dataset/
upload: ./mock_uploaded_data.json to s3://mydataset-bucket/dataset/mock_uploaded_data.json
Then the new item was generate in dynamodb table
Wait for TTL designated time, item was removed from dynamodb table
Check out a new tag under s3 bucket mydataset-bucket
, processed
was found
Now our infrastructure is tested successfully.
Before jumping into our components, we will delete our CloudFormation. Otherwise, any resources being used may be subject to charges.
After deleting CloudFormation data-updater-solution
, we observed how resources were deleted one by one. However, the s3 bucket can’t be deleted due to dependence issue — there were objects inside s3 bucket mydataset-bucket
Deleting objects dataset/
in s3 bucket mydataset-bucket
Deleting CloudFormation again after removing objects in s3 bucket
S3 bucket was deleted successfully after resolving dependency issue
At this point, we cleared up all resources built up by CloudFormation.
From this moment on, let us deep dive into every single resource that is required to be built in this infrastructure.
Notes: The following solutions will follow IaC as a principal. After that, solutions using AWS console will also be provided. If you tend to do both and compare the efficiency of both solutions, IoC has clear advantage over AWS console
1 Creating a DynamoDB table and configuring DynamoDB Streams
IaC solution:
$ aws dynamodb create-table --table-name objects-to-process --attribute-definitions AttributeName=path,AttributeType=S --key-schema AttributeName=path,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 --stream-specification StreamEnabled=TRUE,StreamViewType=NEW_AND_OLD_IMAGES{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "path",
"AttributeType": "S"
}
],
"TableName": "objects-to-process",
"KeySchema": [
{
"AttributeName": "path",
"KeyType": "HASH"
}
],
"TableStatus": "CREATING",
"CreationDateTime": "2020-10-23T21:01:33.045000-04:00",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process",
"TableId": "09e94cb9-3fb1-48aa-824d-20703f94f94a",
"StreamSpecification": {
"StreamEnabled": true,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"LatestStreamLabel": "2020-10-24T01:01:33.045",
"LatestStreamArn": "arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process/stream/2020-10-24T01:01:33.045"
}
}$ aws dynamodb update-time-to-live --table-name objects-to-process --time-to-live-specification "Enabled=true, AttributeName=ttl"
{
"TimeToLiveSpecification": {
"Enabled": true,
"AttributeName": "ttl"
}
}
AWS console solution:
Start first with the time-based trigger setup. For this, you use S3 notifications, DynamoDB Streams, and a Lambda function to integrate both services. The DynamoDB table stores the items to process after a predefined time.
Complete the following steps:
- On the DynamoDB console, create a table.
- For Table name, enter
objects-to-process
. - For Primary key, enter path and choose String.
4. Select the table and click on Manage TTL next to “Time to live attribute” under table details.
5. For TTL attribute, enter ttl
.
6. For DynamoDB Streams, choose Enable with view type New and old images.
Note that you can enable DynamoDB TTL on non-numeric attributes, but it only works on numeric attributes.
The DynamoDB TTL is not minute-precise. Expired items are typically deleted within 48 hours of expiration. However, you may experience shorter deviations of only 10–30 minutes from the actual TTL value. For more information, see Time to Live: How It Works.
DynamoDB table created will be shown as below:
2 Creating a Lambda function to insert TTL records
IoC solution:
create a file named notification.py
locally
vim notification.py
import boto3, os, time
# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300
s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')
def parse_bucket_and_key(s3_notif_event):
s3_record = s3_notif_event['Records'][0]['s3']
return s3_record['bucket']['name'], s3_record['object']['key']
def lambda_handler(event, context):
try:
bucket_name, key = parse_bucket_and_key(event)
head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
record_path = f"s3://{bucket_name}/{key}"
table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
print(f"Found {record_path}")except:
pass # Ignore
zip the file for use
$ zip notification.zip notification.py
adding: notification.py (deflated 47%)
Create the Lambda function
$ aws lambda create-function --function-name NotificationFunction --runtime python3.7 --zip-file fileb://notification.zip --handler notification.lambda_handler --role arn:aws:iam::464392538707:role/Lambda-full{
"FunctionName": "NotificationFunction",
"FunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:NotificationFunction",
"Runtime": "python3.7",
"Role": "arn:aws:iam::464392538707:role/Lambda-full",
"Handler": "notification.lambda_handler",
"CodeSize": 631,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2020-10-26T22:57:26.964+0000",
"CodeSha256": "L/FHohN+dfvBl3zF9pXIiaqtdb8Ct+N2Rsqysn6L/eU=",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
AWS console solution:
The first Lambda function you create is for scheduling tasks. It receives a S3 notification as input, recreates the S3 path (for example, s3://<bucket>/<key>
), and creates a new item on DynamoDB with two attributes: the S3 path and the TTL (in seconds). For more information about a similar S3 notification event structure, see Test the Lambda Function.
To deploy the Lambda function, on the Lambda console, create a function named NotificationFunction
with the Python 3.7 runtime and the following code:
import boto3, os, time
# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300
s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')
def parse_bucket_and_key(s3_notif_event):
s3_record = s3_notif_event['Records'][0]['s3']
return s3_record['bucket']['name'], s3_record['object']['key']
def lambda_handler(event, context):
try:
bucket_name, key = parse_bucket_and_key(event)
head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
record_path = f"s3://{bucket_name}/{key}"
table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
print(f"Found {record_path}") except:
pass # Ignore
Verify Lambda function created:
3 Configuring S3 event notifications on the target bucket
IoC solution:
Grant S3 proper permission to Lambda function
$ aws lambda add-permission --function-name NotificationFunction --statement-id unique --action "lambda:InvokeFunction" --principal s3.amazonaws.com --source-arn "arn:aws:s3:::my-data-s3-api-bucket" --source-account 464392538707
{
"Statement": "{\"Sid\":\"unique\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:us-east-1:464392538707:function:NotificationFunction\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"464392538707\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:s3:::my-data-s3-api-bucket\"}}}"
}
Create a file named notification.json
vim notification.json
{
"LambdaFunctionConfigurations": [
{
"Id": "myevent",
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:NotificationFunction",
"Events": [
"s3:ObjectCreated:*"
],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "theprefix"
}
]
}
}
}
]
}
Then set up the S3 event to trigger the Lambda function
$ aws s3api put-bucket-notification-configuration —-bucket my-data-s3-api-bucket —-notification-configuration file://notification.json
AWS console solution:
You can take advantage of the scalability, security, and performance of S3 by using it as a data lake for storing your datasets. Additionally, you can use S3 event notifications to capture S3-related events, such as the creation or deletion of objects within a bucket. You can forward these events to other AWS services, such as Lambda.
To configure S3 event notifications, complete the following steps:
- On the S3 console, create an S3 bucket named
data-bucket
. - Click on the bucket and go to “Properties” tab.
- Under Advanced Settings, choose Events and add a notification.
- For Name, enter
MyEventNotification
. - For Events, select All object create events.
- For Prefix, enter
dataset/
. - For Send to, choose Lambda Function.
- For Lambda, choose NotificationFunction.
This configuration restricts the scheduling to events that happen within your previously defined dataset. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket?
Event notification created:
4 Creating a Lambda function that performs data processing tasks
IoC solution:
vim my-function.py
import os, json, boto3
from functools import partial
from urllib.parse import urlparses3 = boto3.resource('s3')def parse_bucket_and_key(s3_url_as_string):
s3_path = urlparse(s3_url_as_string)
return s3_path.netloc, s3_path.path[1:]def extract_s3path_from_dynamo_event(event):
if event["Records"][0]["eventName"] == "REMOVE":
return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]def modify_json(json_dict, column_name, value):
json_dict[column_name] = value
return json_dictdef get_obj_contents(bucketname, key):
obj = s3.Object(bucketname, key)
return obj.get()['Body'].iter_lines()clean_column_2_func = partial(modify_json, column_name="file_contents", value="")def lambda_handler(event, context):
s3_url_as_string = extract_s3path_from_dynamo_event(event)
if s3_url_as_string:
bucket_name, key = parse_bucket_and_key(s3_url_as_string)
updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
print(f"Processed {s3_url_as_string}")
else:
print(f"Invalid event: {str(event)}")
Create a Lambda function for processing
$ aws lambda create-function --function-name JSONProcessingFunction --runtime python3.7 --zip-file fileb://function.zip --handler my-function.py --role arn:aws:iam::464392538707:role/Lambda-full{
"FunctionName": "JSONProcessingFunction",
"FunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:JSONProcessingFunction",
"Runtime": "python3.7",
"Role": "arn:aws:iam::464392538707:role/Lambda-full",
"Handler": "function.handler",
"CodeSize": 713,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2020-10-25T06:35:58.983+0000",
"CodeSha256": "xnlRFCGdL8D15CvI7+7EZhaOjZrWbbpTCKWyRjK+xR4=",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "e304df2b-8702-4ea9-9075-a15bd744f45a",
"State": "Active",
"LastUpdateStatus": "Successful"
}
Create an event in Lambda to trigger a processing function using DynamoDB
$ aws lambda create-event-source-mapping --function-name JSONProcessingFunction --batch-size 1 --starting-position TRIM_HORIZON --event-source-arn arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process/stream/2020-10-26T22:34:20.328 --enabled{
"UUID": "9aec9ec4-1295-48ba-a4df-0491fd2dfb62",
"BatchSize": 1,
"MaximumBatchingWindowInSeconds": 0,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process/stream/2020-10-24T01:01:33.045",
"FunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:JSONProcessingFunction",
"LastModified": "2020-10-25T02:38:34.392000-04:00",
"LastProcessingResult": "No records processed",
"State": "Creating",
"StateTransitionReason": "User action",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1
}
AWS console solution:
You have now created a time-based trigger for the deletion of the record in the DynamoDB table. However, when the system delete occurs and the change is recorded in DynamoDB Streams, no further action is taken. Lambda can poll the stream to detect these change records and trigger a function to process them according to the activity (INSERT
, MODIFY
, REMOVE
).
This post is only concerned with deleted items because it uses the TTL feature of DynamoDB Streams to trigger task executions. Lambda gives you the flexibility to either process the item by itself or to forward the processing effort to somewhere else (such as an AWS Glue job or an Amazon SQS queue).
This post uses Lambda directly to process the S3 objects. The Lambda function performs the following tasks:
- Gets the S3 object from the DynamoDB item’s S3 path attribute.
- Modifies the object’s data.
- Overrides the old S3 object with the updated content and tags the object as
processed
.
Complete the following steps:
- On the Lambda console, create a function named
JSONProcessingFunction
with Python 3.7 as the runtime and the following code:
import os, json, boto3
from functools import partial
from urllib.parse import urlparse
s3 = boto3.resource('s3')
def parse_bucket_and_key(s3_url_as_string):
s3_path = urlparse(s3_url_as_string)
return s3_path.netloc, s3_path.path[1:]
def extract_s3path_from_dynamo_event(event):
if event["Records"][0]["eventName"] == "REMOVE":
return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]
def modify_json(json_dict, column_name, value):
json_dict[column_name] = value
return json_dict
def get_obj_contents(bucketname, key):
obj = s3.Object(bucketname, key)
return obj.get()['Body'].iter_lines()
clean_column_2_func = partial(modify_json, column_name="file_contents", value="")
def lambda_handler(event, context):
s3_url_as_string = extract_s3path_from_dynamo_event(event)
if s3_url_as_string:
bucket_name, key = parse_bucket_and_key(s3_url_as_string)
updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
else:
print(f"Invalid event: {str(event)}")
- On the Lambda function configuration webpage, click on Add trigger.
- For Trigger configuration, choose DynamoDB.
- For DynamoDB table, choose objects-to-process.
- For Batch size, enter
1
. - For Batch window, enter
0
. - For Starting position, choose Trim horizon.
- Select Enable trigger.
You use batch size = 1
because each S3 object represented on the DynamoDB table is typically large. If these files are small, you can use a larger batch size. The batch size is essentially the number of files that your Lambda function processes at a time.
Because any new objects on S3 (in a versioning-enabled bucket) create an object creation event, even if its key already exists, you must make sure that your task schedule Lambda function ignores any object creation events that your task execution function creates. Otherwise, it creates an infinite loop. This post uses tags on S3 objects: when the task execution function processes an object, it adds a processed
tag. The task scheduling function ignores those objects in subsequent executions.
5 Using Athena to query the processed data
The final step is to create a table for Athena to query the data. You can do this manually or by using an AWS Glue crawler that infers the schema directly from the data and automatically creates the table for you. This post uses a crawler because it can handle schema changes and add new partitions automatically. To create this crawler, use the following code:
$ aws glue create-crawler --name data-crawler \ --role <AWSGlueServiceRole-crawler> \
--database-name data_db \
--description 'crawl data bucket!' \
--targets \
"{\
\"S3Targets\": [\
{\
\"Path\": \"s3://<data-bucket>/dataset/\"\
}\
]\
}"
Now, let us upload a csv file named username.csv
$ aws s3 cp username.csv s3://my-data-s3-api-bucket/dataset/upload: ./username.csv to s3://my-data-s3-api-bucket/dataset/username.csv
Verify our crawler in AWS Glue
Running data-crawler
Table created automatically extracing file from s3 bucket
Replace <AWSGlueServiceRole-crawler> and <data-bucket> with the name of your AWSGlueServiceRole
and S3 bucket, respectively.
When the crawling process is complete, you can start querying the data. You can use the Athena console to interact with the table while its underlying data is being transparently updated. See the following code:
SELECT * FROM dataset LIMIT 10
Testing the solution
To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents
and replaces its value with an empty string.
You can now upload new data into your data-bucket
S3 bucket under the dataset/
prefix. Your NotificationFunction
Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction
Lambda function processes the data and you can check the resulting changes via an Athena query.
You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed
tag.
Conclustion
Voila, we made it!
Let us recap what we did in this project.
Below is the infrastructure for each and every step of our projects
Here is a break-down of our infrastructure:
- Scheduling trigger — New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
- Task scheduling — As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
- Task execution trigger — When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
- Task execution — The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
- Data usage — The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.
This project showcased how to automatically re-process objects on S3 after a predefined amount of time by using a simple and fully managed scheduling mechanism. Because you use S3 for storage, you automatically benefit from S3’s eventual consistency model, simply by using identical keys (names) both for the original and processed objects. This way, you avoid query results with duplicate or missing data. Also, incomplete or only partially uploaded objects do not result in data inconsistencies because S3 only creates new object versions for successfully completed file transfers.
You may have previously used Spark to process objects hourly. This requires you to monitor objects that must be processed, to move and process them in a staging area, and to move them back to their actual destination. The main drawback is the final step because, due to Spark’s parallelism nature, files are generated with different names and contents. That prevents direct file replacement in the dataset and leads to downtimes or potential data duplicates when data is queried during a move operation. Additionally, because each copy/delete operation could potentially fail, you have to deal with possible partially processed data manually.
From an operations perspective, AWS serverless services simplify your infrastructure. You can combine the scalability of these services with a pay-as-you-go plan to start with a low-cost POC and scale to production quickly — all with a minimal code base.
Compared to hourly Spark jobs, you could potentially reduce costs by up to 80%, which makes this solution both cheaper and simpler.
One more point to add, I’d like to highlight the power of IoC (Infrastructure as Code), for each and every step of this project, we draw a clear comparion in between IoC solution and AWS console solution. So let us automate it!
Last but not least, Python was being used to automate in Lambda. By taking the advantage of severless seervice, we are able to process our file seemlessly.