How to build a fully managed scheduling mechanism for updates on Amazon S3 data lakes

Paul Zhao
Paul Zhao Projects
Published in
17 min readOct 30, 2020
Infrastructure diagram

Here is a break-down of our infrastructure:

  1. Scheduling trigger — New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
  2. Task scheduling — As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
  3. Task execution trigger — When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
  4. Task execution — The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
  5. Data usage — The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.

Project Prerequisites:

1 Install aws cli version 2 on Mac

To install and update for all users using the macOS command line

$ curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
Dload Upload Total Spent Left Speed
100 21.3M 100 21.3M 0 0 10.0M 0 0:00:02 0:00:02 --:--:-- 10.0M$ sudo installer -pkg AWSCLIV2.pkg -target /
Password:
installer: Package name is AWS Command Line Interface
installer: Upgrading at base path /
installer: The upgrade was successful.

Verify installation

$ which aws
/usr/local/bin/aws$ aws --version
aws-cli/2.0.56 Python/3.7.4 Darwin/19.6.0 exe/x86_64

Set up aws configure

$ aws congfigure
AWS Access Key ID [****************DOWD]:
AWS Secret Access Key [****************f3ul]:
Default region name [us-east-1]:
Default output format [json]:

For other platforms, please visit here.

2 Install Python, Pip, Boto3

Homebrew installed for Mac installation (Chocolatey if using Windows)

Installing Homebrew

Open terminal and type in

$ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
.
.
.
==> Installation successful!==> Homebrew has enabled anonymous aggregate formulae and cask analytics.
Read the analytics documentation (and how to opt-out) here:
https://docs.brew.sh/Analytics
No analytics data has been sent yet (or will be during this `install` run).==> Homebrew is run entirely by unpaid volunteers. Please consider donating:
https://github.com/Homebrew/brew#donations==> Next steps:
- Run `brew help` to get started
- Further documentation:
https://docs.brew.sh

Verify Homebrew installation

$ brew --version
Homebrew 2.4.16
Homebrew/homebrew-core (git revision 23bea; last commit 2020-09-04)
Homebrew/homebrew-cask (git revision 5beb1; last commit 2020-09-05)

Once you’ve installed Homebrew, insert the Homebrew directory at the top of your PATH environment variable. You can do this by adding the following line at the bottom of your ~/.profile file

export PATH="/usr/local/opt/python/libexec/bin:$PATH"

2. Python (the version must be 3.7 to make sure that all functions will be available to present intended outcome) is required to be installed

Installing Python3

Now, we can install Python 3:

$ brew install python
Updating Homebrew...
==> Auto-updated Homebrew!
Updated 1 tap (homebrew/cask).
==> Updated Casks
sessionWarning: python@3.8 3.8.5 is already installed and up-to-date
### my python was preinstalled, you may see different installation process. And it may take a while before python is fully installed

Verify is your python3 is installed

$ python3 --version
Python 3.8.5

Notes: you may set your default python as latest version by applying following code

$ unlink /usr/local/bin/python
$ ln -s /usr/local/bin/python3.8 /usr/local/bin/python

Installing Pip

$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1841k 100 1841k 0 0 2989k 0 --:--:-- --:--:-- --:--:-- 2984k$ python get-pip.py
/usr/local/lib/python3.8/site-packages/setuptools/distutils_patch.py:25: UserWarning: Distutils was imported before Setuptools. This usage is discouraged and may exhibit undesirable behaviors or errors. Please use Setuptools' objects directly or at least import Setuptools first.
warnings.warn(
Collecting pip
Using cached pip-20.2.3-py2.py3-none-any.whl (1.5 MB)
Installing collected packages: pip
Attempting uninstall: pip
Found existing installation: pip 20.2.3
Uninstalling pip-20.2.3:
Successfully uninstalled pip-20.2.3
Successfully installed pip-20.2.3

Verify installation

$ pip --version
pip 20.2.3 from /usr/local/lib/python3.8/site-packages/pip (python 3.8)

Installing Boto3

$ python -m pip install boto3
Requirement already satisfied: boto3 in /usr/local/lib/python3.8/site-packages (1.15.14)
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /usr/local/lib/python3.8/site-packages (from boto3) (0.10.0)
Requirement already satisfied: s3transfer<0.4.0,>=0.3.0 in /usr/local/lib/python3.8/site-packages (from boto3) (0.3.3)
Requirement already satisfied: botocore<1.19.0,>=1.18.14 in /usr/local/lib/python3.8/site-packages (from boto3) (1.18.14)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /usr/local/lib/python3.8/site-packages (from botocore<1.19.0,>=1.18.14->boto3) (2.8.1)
Requirement already satisfied: urllib3<1.26,>=1.20; python_version != "3.4" in /usr/local/lib/python3.8/site-packages (from botocore<1.19.0,>=1.18.14->boto3) (1.25.10)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.8/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.19.0,>=1.18.14->boto3) (1.15.0)

Verify installation

$ pip show boto3
Name: boto3
Version: 1.15.14
Summary: The AWS SDK for Python
Home-page: https://github.com/boto/boto3
Author: Amazon Web Services
Author-email: UNKNOWN
License: Apache License 2.0
Location: /usr/local/lib/python3.8/site-packages
Requires: botocore, jmespath, s3transfer
Required-by: aws-shell

Before we dive into every component of this project, we would build this infrastructure using CloudFormation and test it.

1 Fill in stack name as well as S3 bucket name (has to be globally unique)

Stack and bucket names created

2 Move on with default set

Default set

3 Check I acknowledge box and create stack

Acknowledge and create stack

4 Wait until all resources listed below completed

Resources created by cloudformation

Testing our infrastructure

To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents and replaces its value with an empty string.

To test our infrastructure, we may upload new data into your S3 bucket mydataset-bucket under the dataset/ prefix. NotificationFunction Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction Lambda function processes the data and you can check the resulting changes via an Athena query.

You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed tag.

Here I saved mock_uploaded_data.json in my local environment and uploaded it to S3 bucket named mydataset-bucket under prefix dataset/

$ aws s3 cp mock_uploaded_data.json s3://mydataset-bucket/dataset/
upload: ./mock_uploaded_data.json to s3://mydataset-bucket/dataset/mock_uploaded_data.json

Then the new item was generate in dynamodb table

New item created in dynamodb table

Wait for TTL designated time, item was removed from dynamodb table

Item removed from dynamodb table

Check out a new tag under s3 bucket mydataset-bucket , processed was found

Tag processed found in s3 bucket

Now our infrastructure is tested successfully.

Before jumping into our components, we will delete our CloudFormation. Otherwise, any resources being used may be subject to charges.

After deleting CloudFormation data-updater-solution , we observed how resources were deleted one by one. However, the s3 bucket can’t be deleted due to dependence issue — there were objects inside s3 bucket mydataset-bucket

Deleting resources in cloudformation

Deleting objects dataset/ in s3 bucket mydataset-bucket

Deleting objects in s3 bucket

Deleting CloudFormation again after removing objects in s3 bucket

Deleting cloudformation again

S3 bucket was deleted successfully after resolving dependency issue

S3 bucket deleted in cloudformation

At this point, we cleared up all resources built up by CloudFormation.

From this moment on, let us deep dive into every single resource that is required to be built in this infrastructure.

Notes: The following solutions will follow IaC as a principal. After that, solutions using AWS console will also be provided. If you tend to do both and compare the efficiency of both solutions, IoC has clear advantage over AWS console

1 Creating a DynamoDB table and configuring DynamoDB Streams

IaC solution:

$ aws dynamodb create-table --table-name objects-to-process --attribute-definitions AttributeName=path,AttributeType=S --key-schema AttributeName=path,KeyType=HASH --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 --stream-specification StreamEnabled=TRUE,StreamViewType=NEW_AND_OLD_IMAGES{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "path",
"AttributeType": "S"
}
],
"TableName": "objects-to-process",
"KeySchema": [
{
"AttributeName": "path",
"KeyType": "HASH"
}
],
"TableStatus": "CREATING",
"CreationDateTime": "2020-10-23T21:01:33.045000-04:00",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process",
"TableId": "09e94cb9-3fb1-48aa-824d-20703f94f94a",
"StreamSpecification": {
"StreamEnabled": true,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"LatestStreamLabel": "2020-10-24T01:01:33.045",
"LatestStreamArn": "arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process/stream/2020-10-24T01:01:33.045"
}
}
$ aws dynamodb update-time-to-live --table-name objects-to-process --time-to-live-specification "Enabled=true, AttributeName=ttl"
{
"TimeToLiveSpecification": {
"Enabled": true,
"AttributeName": "ttl"
}
}

AWS console solution:

Start first with the time-based trigger setup. For this, you use S3 notifications, DynamoDB Streams, and a Lambda function to integrate both services. The DynamoDB table stores the items to process after a predefined time.

Complete the following steps:

  1. On the DynamoDB console, create a table.
  2. For Table name, enter objects-to-process.
  3. For Primary key, enter path and choose String.
Table and primary key created

4. Select the table and click on Manage TTL next to “Time to live attribute” under table details.

5. For TTL attribute, enter ttl.

6. For DynamoDB Streams, choose Enable with view type New and old images.

Ttl and streams created

Note that you can enable DynamoDB TTL on non-numeric attributes, but it only works on numeric attributes.

The DynamoDB TTL is not minute-precise. Expired items are typically deleted within 48 hours of expiration. However, you may experience shorter deviations of only 10–30 minutes from the actual TTL value. For more information, see Time to Live: How It Works.

DynamoDB table created will be shown as below:

Dynamodb table created

2 Creating a Lambda function to insert TTL records

IoC solution:

create a file named notification.py locally

vim notification.py

import boto3, os, time

# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300

s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')

def parse_bucket_and_key(s3_notif_event):
s3_record = s3_notif_event['Records'][0]['s3']
return s3_record['bucket']['name'], s3_record['object']['key']

def lambda_handler(event, context):
try:
bucket_name, key = parse_bucket_and_key(event)
head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
record_path = f"s3://{bucket_name}/{key}"
table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
print(f"Found {record_path}")
except:
pass # Ignore

zip the file for use

$ zip notification.zip notification.py
adding: notification.py (deflated 47%)

Create the Lambda function

$ aws lambda create-function --function-name NotificationFunction --runtime python3.7 --zip-file fileb://notification.zip --handler notification.lambda_handler --role arn:aws:iam::464392538707:role/Lambda-full{
"FunctionName": "NotificationFunction",
"FunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:NotificationFunction",
"Runtime": "python3.7",
"Role": "arn:aws:iam::464392538707:role/Lambda-full",
"Handler": "notification.lambda_handler",
"CodeSize": 631,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2020-10-26T22:57:26.964+0000",
"CodeSha256": "L/FHohN+dfvBl3zF9pXIiaqtdb8Ct+N2Rsqysn6L/eU=",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},

AWS console solution:

The first Lambda function you create is for scheduling tasks. It receives a S3 notification as input, recreates the S3 path (for example, s3://<bucket>/<key>), and creates a new item on DynamoDB with two attributes: the S3 path and the TTL (in seconds). For more information about a similar S3 notification event structure, see Test the Lambda Function.

To deploy the Lambda function, on the Lambda console, create a function named NotificationFunction with the Python 3.7 runtime and the following code:

import boto3, os, time

# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300

s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')

def parse_bucket_and_key(s3_notif_event):
s3_record = s3_notif_event['Records'][0]['s3']
return s3_record['bucket']['name'], s3_record['object']['key']

def lambda_handler(event, context):
try:
bucket_name, key = parse_bucket_and_key(event)
head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
record_path = f"s3://{bucket_name}/{key}"
table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
print(f"Found {record_path}")
except:
pass # Ignore

Verify Lambda function created:

S3 objects uploaded to trigger lambda function
Function code applied
Basic settings

3 Configuring S3 event notifications on the target bucket

IoC solution:

Grant S3 proper permission to Lambda function

$ aws lambda add-permission --function-name NotificationFunction --statement-id unique --action "lambda:InvokeFunction" --principal s3.amazonaws.com --source-arn "arn:aws:s3:::my-data-s3-api-bucket" --source-account 464392538707
{
"Statement": "{\"Sid\":\"unique\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:us-east-1:464392538707:function:NotificationFunction\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"464392538707\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:s3:::my-data-s3-api-bucket\"}}}"
}

Create a file named notification.json

vim notification.json

{
"LambdaFunctionConfigurations": [
{
"Id": "myevent",
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:NotificationFunction",
"Events": [
"s3:ObjectCreated:*"
],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "theprefix"
}
]
}
}
}
]
}

Then set up the S3 event to trigger the Lambda function

$ aws s3api put-bucket-notification-configuration —-bucket my-data-s3-api-bucket —-notification-configuration file://notification.json

AWS console solution:

You can take advantage of the scalability, security, and performance of S3 by using it as a data lake for storing your datasets. Additionally, you can use S3 event notifications to capture S3-related events, such as the creation or deletion of objects within a bucket. You can forward these events to other AWS services, such as Lambda.

To configure S3 event notifications, complete the following steps:

  1. On the S3 console, create an S3 bucket named data-bucket.
  2. Click on the bucket and go to “Properties” tab.
  3. Under Advanced Settings, choose Events and add a notification.
  4. For Name, enter MyEventNotification.
  5. For Events, select All object create events.
  6. For Prefix, enter dataset/.
  7. For Send to, choose Lambda Function.
  8. For Lambda, choose NotificationFunction.
S3 event created

This configuration restricts the scheduling to events that happen within your previously defined dataset. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket?

Event notification created:

Event notification created

4 Creating a Lambda function that performs data processing tasks

IoC solution:

vim my-function.py

import os, json, boto3
from functools import partial
from urllib.parse import urlparse
s3 = boto3.resource('s3')def parse_bucket_and_key(s3_url_as_string):
s3_path = urlparse(s3_url_as_string)
return s3_path.netloc, s3_path.path[1:]
def extract_s3path_from_dynamo_event(event):
if event["Records"][0]["eventName"] == "REMOVE":
return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]
def modify_json(json_dict, column_name, value):
json_dict[column_name] = value
return json_dict
def get_obj_contents(bucketname, key):
obj = s3.Object(bucketname, key)
return obj.get()['Body'].iter_lines()
clean_column_2_func = partial(modify_json, column_name="file_contents", value="")def lambda_handler(event, context):
s3_url_as_string = extract_s3path_from_dynamo_event(event)
if s3_url_as_string:
bucket_name, key = parse_bucket_and_key(s3_url_as_string)
updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
print(f"Processed {s3_url_as_string}")
else:
print(f"Invalid event: {str(event)}")

Create a Lambda function for processing

$ aws lambda create-function --function-name JSONProcessingFunction --runtime python3.7 --zip-file fileb://function.zip --handler my-function.py --role arn:aws:iam::464392538707:role/Lambda-full{
"FunctionName": "JSONProcessingFunction",
"FunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:JSONProcessingFunction",
"Runtime": "python3.7",
"Role": "arn:aws:iam::464392538707:role/Lambda-full",
"Handler": "function.handler",
"CodeSize": 713,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2020-10-25T06:35:58.983+0000",
"CodeSha256": "xnlRFCGdL8D15CvI7+7EZhaOjZrWbbpTCKWyRjK+xR4=",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "e304df2b-8702-4ea9-9075-a15bd744f45a",
"State": "Active",
"LastUpdateStatus": "Successful"
}

Create an event in Lambda to trigger a processing function using DynamoDB

$ aws lambda create-event-source-mapping --function-name JSONProcessingFunction --batch-size 1 --starting-position TRIM_HORIZON --event-source-arn arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process/stream/2020-10-26T22:34:20.328 --enabled{
"UUID": "9aec9ec4-1295-48ba-a4df-0491fd2dfb62",
"BatchSize": 1,
"MaximumBatchingWindowInSeconds": 0,
"ParallelizationFactor": 1,
"EventSourceArn": "arn:aws:dynamodb:us-east-1:464392538707:table/objects-to-process/stream/2020-10-24T01:01:33.045",
"FunctionArn": "arn:aws:lambda:us-east-1:464392538707:function:JSONProcessingFunction",
"LastModified": "2020-10-25T02:38:34.392000-04:00",
"LastProcessingResult": "No records processed",
"State": "Creating",
"StateTransitionReason": "User action",
"DestinationConfig": {
"OnFailure": {}
},
"MaximumRecordAgeInSeconds": -1,
"BisectBatchOnFunctionError": false,
"MaximumRetryAttempts": -1
}

AWS console solution:

You have now created a time-based trigger for the deletion of the record in the DynamoDB table. However, when the system delete occurs and the change is recorded in DynamoDB Streams, no further action is taken. Lambda can poll the stream to detect these change records and trigger a function to process them according to the activity (INSERT, MODIFY, REMOVE).

This post is only concerned with deleted items because it uses the TTL feature of DynamoDB Streams to trigger task executions. Lambda gives you the flexibility to either process the item by itself or to forward the processing effort to somewhere else (such as an AWS Glue job or an Amazon SQS queue).

This post uses Lambda directly to process the S3 objects. The Lambda function performs the following tasks:

  1. Gets the S3 object from the DynamoDB item’s S3 path attribute.
  2. Modifies the object’s data.
  3. Overrides the old S3 object with the updated content and tags the object as processed.

Complete the following steps:

  1. On the Lambda console, create a function named JSONProcessingFunction with Python 3.7 as the runtime and the following code:
import os, json, boto3
from functools import partial
from urllib.parse import urlparse

s3 = boto3.resource('s3')

def parse_bucket_and_key(s3_url_as_string):
s3_path = urlparse(s3_url_as_string)
return s3_path.netloc, s3_path.path[1:]

def extract_s3path_from_dynamo_event(event):
if event["Records"][0]["eventName"] == "REMOVE":
return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]

def modify_json(json_dict, column_name, value):
json_dict[column_name] = value
return json_dict

def get_obj_contents(bucketname, key):
obj = s3.Object(bucketname, key)
return obj.get()['Body'].iter_lines()

clean_column_2_func = partial(modify_json, column_name="file_contents", value="")

def lambda_handler(event, context):
s3_url_as_string = extract_s3path_from_dynamo_event(event)
if s3_url_as_string:
bucket_name, key = parse_bucket_and_key(s3_url_as_string)
updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
else:
print(f"Invalid event: {str(event)}")
  1. On the Lambda function configuration webpage, click on Add trigger.
  2. For Trigger configuration, choose DynamoDB.
  3. For DynamoDB table, choose objects-to-process.
  4. For Batch size, enter 1.
  5. For Batch window, enter 0.
  6. For Starting position, choose Trim horizon.
  7. Select Enable trigger.
Event created

You use batch size = 1 because each S3 object represented on the DynamoDB table is typically large. If these files are small, you can use a larger batch size. The batch size is essentially the number of files that your Lambda function processes at a time.

Because any new objects on S3 (in a versioning-enabled bucket) create an object creation event, even if its key already exists, you must make sure that your task schedule Lambda function ignores any object creation events that your task execution function creates. Otherwise, it creates an infinite loop. This post uses tags on S3 objects: when the task execution function processes an object, it adds a processed tag. The task scheduling function ignores those objects in subsequent executions.

5 Using Athena to query the processed data

The final step is to create a table for Athena to query the data. You can do this manually or by using an AWS Glue crawler that infers the schema directly from the data and automatically creates the table for you. This post uses a crawler because it can handle schema changes and add new partitions automatically. To create this crawler, use the following code:

$ aws glue create-crawler --name data-crawler \ --role <AWSGlueServiceRole-crawler> \
--database-name data_db \
--description 'crawl data bucket!' \
--targets \
"{\
\"S3Targets\": [\
{\
\"Path\": \"s3://<data-bucket>/dataset/\"\
}\
]\
}"

Now, let us upload a csv file named username.csv

$ aws s3 cp username.csv s3://my-data-s3-api-bucket/dataset/upload: ./username.csv to s3://my-data-s3-api-bucket/dataset/username.csv

Verify our crawler in AWS Glue

Crawler created

Running data-crawler

Run data-crawler

Table created automatically extracing file from s3 bucket

Table created

Replace <AWSGlueServiceRole-crawler> and <data-bucket> with the name of your AWSGlueServiceRole and S3 bucket, respectively.

When the crawling process is complete, you can start querying the data. You can use the Athena console to interact with the table while its underlying data is being transparently updated. See the following code:

SELECT * FROM dataset LIMIT 10

Outcome generated

Testing the solution

To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents and replaces its value with an empty string.

You can now upload new data into your data-bucket S3 bucket under the dataset/ prefix. Your NotificationFunction Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction Lambda function processes the data and you can check the resulting changes via an Athena query.

You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed tag.

New item created
New item removed
Processed tag checked

Conclustion

Voila, we made it!

Let us recap what we did in this project.

Below is the infrastructure for each and every step of our projects

Infrastructure diagram

Here is a break-down of our infrastructure:

  1. Scheduling trigger — New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
  2. Task scheduling — As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
  3. Task execution trigger — When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
  4. Task execution — The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
  5. Data usage — The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.

This project showcased how to automatically re-process objects on S3 after a predefined amount of time by using a simple and fully managed scheduling mechanism. Because you use S3 for storage, you automatically benefit from S3’s eventual consistency model, simply by using identical keys (names) both for the original and processed objects. This way, you avoid query results with duplicate or missing data. Also, incomplete or only partially uploaded objects do not result in data inconsistencies because S3 only creates new object versions for successfully completed file transfers.

You may have previously used Spark to process objects hourly. This requires you to monitor objects that must be processed, to move and process them in a staging area, and to move them back to their actual destination. The main drawback is the final step because, due to Spark’s parallelism nature, files are generated with different names and contents. That prevents direct file replacement in the dataset and leads to downtimes or potential data duplicates when data is queried during a move operation. Additionally, because each copy/delete operation could potentially fail, you have to deal with possible partially processed data manually.

From an operations perspective, AWS serverless services simplify your infrastructure. You can combine the scalability of these services with a pay-as-you-go plan to start with a low-cost POC and scale to production quickly — all with a minimal code base.

Compared to hourly Spark jobs, you could potentially reduce costs by up to 80%, which makes this solution both cheaper and simpler.

One more point to add, I’d like to highlight the power of IoC (Infrastructure as Code), for each and every step of this project, we draw a clear comparion in between IoC solution and AWS console solution. So let us automate it!

Last but not least, Python was being used to automate in Lambda. By taking the advantage of severless seervice, we are able to process our file seemlessly.

--

--

Paul Zhao
Paul Zhao Projects

Amazon Web Service Certified Solutions Architect Professional & Devops Engineer