2. AWS Lambda & ECS (Elastic Container service) in Data ETL with Python (Part 1 — Parent / Child Function)
This article is about processing the raw data using AWS Lambda and ECS (Elastic Container service). It is one of part of the project where all is automated. From the very beginning of data collection until the data analytics publishing on the website, there is no manual process.
More can be found in Github.
Databases: AWS RDS for MySQL, or AWS DynamoDB (no sql database)
- AWS RDS for MySQL
Let’s see how Lambda works with MySQL Database. Details on ECS will be shared later in the article on Container.
The picture shows the workflow from the angle of developer. As discussed in the previous article (1. Data Collection using Python) , every day the latest data is downloaded from the datasource and uploaded into AWS S3, Parent Lambda who is outside of VPC will be invoked and check the message sent by SQS to see if the file is valid to be processed. If yes, Parent Lambda will check the file in s3 and decide what is do next. If the file is a .sql file, Parent Lambda will invoke ECS to initiate database or make any necessary change by executing the stored procedures in the file. If the file is a .csv or .txt file and the file size is small, Parent Lambda will invoke Child Lambda in the VPC to do data ETL, otherwise ECS will be invoked to process the large dataset.
After Child Lambda or ECS processes the file and reports to Parent Lambda, Parent Lambda decide if to back up files and if to invoke Reporting Lambda to calculate data for reports or dashboards.
Leader Lambda (Parent Lambda):
1) to validate s3 event, s3 key
2) to check the file and split the file if it exceeds the limit
3) to decide to invoke ECS container for Database Init or to invoke child lambda for Data ETL/Analysis
4) — — waiting for child response — — — -
5) to backup the data file in another s3 bucket
6) to invoke loading lambda for report data creation
7) — — waiting for child response — — — -
8) to invoke reporting lambda for data export
9) — — waiting for child response — — — -
10) to send email and notify person in charge that the Data processing is successful or not
11) return failure message to SQS if any
Loading Lambda (Child Lambda):
1) to get task from parent
2) to connect Database and load data using Procedures (predefined by ECS task)
a) to create temp table in MySQL
b) to load data from data file to temp table
c) to check if all data can be loaded to temp table
d) if yes, to load data from temp table to permanent table in MySQL
3) to respond to leader lambda for data ETL
4) to get task from parent for report data
5) to execute stored procedures in MySQL for report data creation
6) to respond to leader lambda
Reporting Lambda (Child Lambda):
1) to get task from parent
2) to export data from MySQL to S3 using MySQL command line
3) to save the report data in the .csv or .json file in AWS S3
4) to respond to leader lambda
Note:
- As I applied Containerized Lambda for Reporting Lambda, its scripts will be shared later in the article on Container.
- Personally, I prefer Lambda to be a leader in the project other than to work on detailed ETL job. It has payload limit of 6MB. That’s why I decided to use ECS to process a large dataset. If we would not like to choose ECS or EC2 for some reason, splitting large data files into smaller ones and letting Child Lambda to load data is a workable solution. ( I include scripts for splitting in below Lambda Function as well.)
===========> Parent Lambda Python Scripts
###### something we should know about Lambda #######
'''
1) Lambda can stay in or out of VPC. How to deploy Lambda in this project then?
If we put it in a VPC, the communication between lambda and other AWS resources (like databases)
in the same VPC would become much easier, but makes communication with resources out of the VPC much harder.
If lambda is out of a VPC, which is a usual practice, as I believe,
will make lambda super powerful, it can reach out at any resources in your cloud and become a real
leader in your project easily. By way of boto3, nearly every resource can be created and run by lambda.
2) It seems that the lambda out of VPC is a winner?
Unfortunately, no. As AWS RDS (MySQL) cannot provided a DATA API
, therefore, lambda can't access it from outside of VPC. Also for the reason of security,
we won't let lambda outside of VPC possess sensitive login info and connect to the RDS directly.
We need lambda in the VPC to communicate with the database. Meanwhile, we really need another lambda out of VPC to coordinate
all AWS resources to work together.
'''
from __future__ import print_function
import json
import urllib
import time
from datetime import datetime
from datetime import date
import pytz
from pytz import timezone
import urllib.parse
from urllib.parse import unquote_plus
import unicodedata
### clients settings ###
import boto3
import botocore.session as bc
lambda_client=boto3.client('lambda')
ecs_client=boto3.client('ecs')
s3_client=boto3.client('s3')
s3_resource=boto3.resource('s3')
session=boto3.Session()
s3_session_resource=session.resource('s3')
rds_data_client = boto3.client('rds-data') # this is for aurora only...
import os
aws_region=os.getenv('aws_region')
Account_Id=os.getenv('Account_Id')
topic_name_on_success=os.getenv('topic_name_on_success')
topic_arn_on_success=os.getenv('topic_arn_on_success')
topic_name_on_failure=os.getenv('topic_name_on_failure')
topic_arn_on_failure=os.getenv('topic_arn_on_failure')
loading_arn=os.getenv('loading_arn')
reporting_arn=os.getenv('reporting_arn')
ecs_task_arn=os.getenv('ecs_task_arn')
ecs_cluster_arn=os.getenv('ecs_cluster_arn')
ecs_service_subnets = os.getenv('ecs_service_subnets')
ecs_service_subnets =ecs_service_subnets.split(',')
# subnets and groups are string values
# needs to convert string to list
ecs_security_groups = os.getenv('ecs_security_groups')
ecs_security_groups =ecs_security_groups.split(',')
ecs_container_name = os.getenv('ecs_container_name')
#below for backup folder
backup_bucket=os.getenv('backup_bucket')
backup_folder="backup"
### for read/split data files in s3 ###
import pandas
from io import StringIO
import math
### time_zone settings ###
time_zone=timezone('EST')
time_zone1="America/New_York"
#print("now is {}".format(datetime.now(time_zone).date()))
today = datetime.now(time_zone).date()
#this can make sure we use est timezone
### timeout settings ###
time_interval_in_minutes=30 # to load the file if it is uploaded within 30 minutes
lambda_timeout_in_seconds=60
maxerrors_allowed=5
def lambda_handler(event, context):
stage =0
file_split=0 # not to split the file, unless it exceeds 6mb
small_files_folder=''
schema_name=''
temp_tablename=''
tablename=''
loading_status=0
reporting_status=0
report_data_prepare=0
if event:
#print(event)
message=event['Records'][0]
messages_to_reprocess =[]
# this is a long story, lambda is concurrent and it can't solve the problem of dead lock itself
# this is why we apply SQS to help reprocess those files who are considered to be dead lock
# We set batchsize for the queue to be more than 1, for example, 5. if lambda reports to SQS
# that only one of the five messages failed to process, the other four files are successfully consumed
# SQS will treat the whole queue to be a failure. after the invisibility timeout, these five messages
# will be reput to the queue again....
# Fortunately, mapping event has a parameter called 'partial batch response'. it enables lambda to only report
# with failed messages to SQS. and SQS only reprocess these failed ones.
# But according to my experience, setting up this parameter is far not enough. We need to explicitly tell lambda in codes,
# which message failed and should be returned to SQS.
# The above message_to_reprocess is to collect any failure message.
batch_failure_response={}
for record in event["Records"]:
try:
body = record["body"]
#body=event['Records'][0]['body']
body=json.loads(body)
s3_event=body['Records'][0]
event_name=s3_event['eventName']
print("event_name is {}".format(event_name))
if "ObjectCreated:" not in event_name:
print('event is not s3:create')
s3_bucket=s3_event['s3']['bucket']['name']
#print("s3_bucket is : {}".format(s3_bucket))
#=================================
delete_message(message)
#=================================
exit()
#do nothing as this is not a valid event
else:
s3_bucket=s3_event['s3']['bucket']['name']
#print("s3_bucket is : {}".format(s3_bucket))
s3_key=s3_event['s3']['object']['key']
s3_key=unquote_plus(s3_key)
#to eliminate the '+' sign in the folder name
# this is essential if we try to search for any keys in s3
# but in cloudfront client, we need that plus sign
print("s3_key is : {}".format(s3_key))
s3_key_size=s3_event['s3']['object']['size']
file_size_in_mib=s3_key_size/(1024*1024)
#get the data file size
print(f"file_size is {s3_key_size/(1024*1024)} MB")
#============================================================
#get the pure file with/without extension
#get the pure file with lower cases only
file_name=os.path.basename(s3_key)
length=len(file_name)
s3_key_extension=file_name[length-3:]
s3_key_extension=s3_key_extension.lower()
s3_key_withoutextension=file_name[:length-4]
s3_key_withoutextension=s3_key_withoutextension.lower()
from_path = "s3://{}/{}".format(s3_bucket, s3_key)
#create file path for data file in s3
#print("URI of s3_key is {}".format(from_path))
if 'backup' in s3_bucket and s3_key_extension == "sql":
print_content='File to init DB has been uploaded to {}. Start running ECS task'.format(from_path)
print(print_content)
file_delimiter=';'
#means sql files for db init uploaded to backup bucket
response=ecs_client.run_task(
taskDefinition=ecs_task_arn,
launchType='FARGATE',
cluster=ecs_cluster_arn,
platformVersion='LATEST',
count=1,
networkConfiguration={
'awsvpcConfiguration': {
'subnets': ecs_service_subnets,
'assignPublicIp': 'DISABLED',
#'assignPublicIp': 'ENABLED',
'securityGroups': ecs_security_groups
}
},
overrides={
'containerOverrides': [
{
'name': ecs_container_name,
'command':["python", "/rds_init.py"],
'environment': [
{
'name': 'task',
'value': 'rds_init'
},
{
'name': 'file_name',
'value': file_name
},
{
'name': 's3_bucket',
'value': s3_bucket
},
{
'name': 's3_key',
'value': s3_key
},
{
'name': 's3_key_withoutextension',
'value': s3_key_withoutextension
}
]
}
]
}
)
print("Trying to get response from ecs container : {}".format(response))
'''
# below method can't check ECS task running status
# the response always show failure is 'missing'
# AWS only has a way to check the task by exporting ecs logs to cloudwatch
# this has been done in container definition
print("getting response from ecs container : {}".format(response))
# this response will just let you know if a ecs task is initiated or not
for task in response['tasks']:
attachments=task['attachments']
#attachments=json.load(attachments)
task_id=attachments[0]['id']
print('task_id is : {}'.format(task_id))
# to check if a task is successfully run in the end or not
attempts=0
MAX_WAIT_CYCLES=3
print('checking container running status')
while attempts<MAX_WAIT_CYCLES:
attempts +=1
time.sleep(3)
resp = ecs_client.describe_tasks(
cluster=ecs_cluster_arn,
tasks=[
task_id,
]
)
print('>>>>>>>>>>>>>')
print("GETTING RESPONSE AFTER CHECKING TASK : {}".format(resp))
exit_code=resp['tasks'][0]['containers'][0]['exitCode']
print("exit_code is {}".format(exit_code))
failure_reason=resp['failures'][0]['reason']
print("failure_reason is {}".format(failure_reason))
failure_detail=resp['failures'][0]['detail']
print("failure_detail is {}".format(failure_detail))
print('>>>>>>>>>>>>>')
'''
# since we can't let lambda check the task is successful or not
# we need to refer to cloudwatch logs to see if everything goes well
exit()
elif 'upload' in s3_bucket and (s3_key_extension == "txt" or s3_key_extension == "csv"):
#if the uploaded file is a flat file
#print('the file can be processed further')
### attention:
### if data set is large, use ECS
### if data set is small, use child lambda
if s3_key_extension=='csv':
file_delimiter=','
elif s3_key_extension=='txt':
file_delimiter=','
resp=get_tablename(s3_key_withoutextension)
schema_name= resp['schema_name']
temp_tablename=resp['temp_tablename']
tablename=resp['correct_tablename']
if tablename=='':
#=================================
delete_message(message)
#=================================
exit()
#print('The file of {} is ready to be loaded to MySQL'.format(s3_key))
stage =1
#============================================================
# if the size is more than 6mb, will split it into smaller ones
if file_size_in_mib>5.6:
file_split=1
response=split_file(s3_bucket,s3_key,file_delimiter,file_size_in_mib)
small_files_folder=response['small_files_folder']
stage=2
# to invoke loading lambda
inputParams = {
"file_split":file_split,
"small_files_folder":small_files_folder,
"file_name":file_name,
"s3_bucket":s3_bucket,
"s3_key":s3_key,
"file_delimiter":file_delimiter,
"schema_name":schema_name,
"temp_tablename":temp_tablename,
"tablename":tablename,
"report_data_prepare":report_data_prepare
}
print(inputParams)
responseFromChild=invoke_another_lambda(inputParams,1,2)
loading_status =responseFromChild['loading_status']
## after we get the response from loading function
## to backup or return failure message
if loading_status==1:
#means loading succeeded
print('loading succeeded!')
##### EEE) move the file into another s3 bucket for backup ######
stage=3
backup_file(s3_bucket,s3_key,file_name)
#=================================
delete_message(message)
#=================================
# after backup, to check remaining files and decide if do reportings
stage=5
respo=check_remaining_files(s3_bucket)
#return{
#"unloadfiles":unload_file,
#"filenumber":countvalid
#}
print('after checking')
if respo['unloadfiles']==None:
#means the function returned an empty unload_file
print_content='today''s files were all loaded!!!'
print(print_content)
subject=print_content
body="hooray"
notify(1,subject,body)
#if today's files are not report-related files,
# SPs in mysql won't be executed
if ('PriceIndex' in tablename) :
report_data_prepare=1
elif ('here is the tablename' in tablename):
reporting_status2=True
# can have multiple reporting task for children lambdas
else:
print('There are {} files remaining: {}'.format(respo['filenumber'],respo['unloadfiles']))
#file_string=respo['unloadtables']
#file_string is a list, needs to convert to string
files_to_string=respo['unloadtables']
#files_to_string=' '.join(map(str, file_string))
files_to_string=files_to_string.lower()
print('unload tables are {}'.format(files_to_string))
if ('PriceIndex' in tablename) and ('PriceIndex' not in files_to_string) :
# means data file needed for reporting was uploaded today, but it has been loaded and moved to back up
# no matter what remaining files in s3 now, the report can be built
print('report tables for dashboards are ready')
report_data_prepare=1
if report_data_prepare==1:
#means its time to do the reports
print('Invoking Loading Lambda for Reporting')
inputParams = {
"schema_name":schema_name,
"tablename":tablename,
"report_data_prepare":report_data_prepare
}
responseFromChild=invoke_another_lambda(inputParams,1,2)
# leader lambda need to wait for loading lambda's response
# as loading lambda and reporting lambda in the same VPC can't connect
# leader lambda is used to invoke reporting lambda as well
stage=6
print(responseFromChild)
reporting_data_ready=responseFromChild['reporting_data_created']
if reporting_data_ready==1:
#it's time to export data from MySQL to S3
print('invoking reporting lambda')
inputParams = {
"report_source_data":responseFromChild['report_source_data'],
"report_source_data_folder":responseFromChild['report_source_data_folder']
}
report_source_data_directory=responseFromChild['report_source_data_folder']
responseFromChild=invoke_another_lambda(inputParams,1,1)
print('Receiving response from Reporting Lambda : ')
print(responseFromChild)
report1=responseFromChild['reporting_status']
if report1==1:
#means reporting lambda succeeded in exporting data from MySQL to s3 in the format of .csv file
print_content="Report data was saved successfully in {}/{}".format(backup_bucket,report_source_data_directory)
print(print_content)
subject=print_content
body='hi'
notify(1,subject,body)
else:
#means error when exporting data from MySQL to s3
e=responseFromChild['error']
print_content="Error when exporting data from MySQL to s3. Report: PriceIndex; Error: {}".format(e)
print(print_content)
subject=print_content
body='hi'
#notify(0,subject,body)
else:
#means report data not created
e=responseFromChild['error']
print_content="Error when creating report data in MySQL to s3. Table for calculation: PriceIndex; Error: {}".format(e)
print(print_content)
subject=print_content
body='hi'
#notify(0,subject,body)
#else:
#reporting_status==0
# after checking the remaining files in s3
# still needs child lambda to continue to load before reporting
else:
# loading lambda reports failure
stage=4
if loading_status==2:
# the file does not exist according to loading lambda
print_content="There is no such file {}. It might be processed and moved to backup by another lambda.".format(s3_key)
print(print_content)
#=================================
delete_message(message)
#=================================
exit()
else:
print_content='loading of {} failed according to loading lambda. error: {}. Please check the log table in MySQL'.format(s3_key,responseFromChild['error'])
print(print_content)
messages_to_reprocess.append({"itemIdentifier": record['messageId']})
print('Adding failed file into reprogress response list')
else: # the file is not csv, txt or sql
print ('the file of {} is not for DB init or Data ETL.'.format(s3_key))
delete_message(message)
exit()
except Exception as e:
if stage ==0 :
reprocess=1
#means needs to reprocess
print_content='stage={}, error when checking files in s3, description: {}'.format(stage,e)
if stage ==1:
reprocess=1
print_content='stage={}, error when splitting large files, description: {}'.format(stage,e)
if stage ==2:
reprocess=1
print_content='stage={}, error when invoking loading lambda, description: {}'.format(stage,e)
if stage ==3:
reprocess=0
# means loading succeeded, backup errors
# no need to reprocess
print_content='stage={}, error when backing up files, description: {}'.format(stage,e)
if stage ==4:
reprocess=1
#means loading error
print_content='stage={}, child lambda reports loading errors, description: {}'.format(stage,e)
if stage ==5:
reprocess=0
# means invoking reporting lambda failed
# no need to reprocess
print_content='stage={}, error when invoking loading lambda for report data, description: {}'.format(stage,e)
if stage ==6:
reprocess=0
# means error when exporting data from mysql to s3
print_content='stage={}, error when invoking reporting lambda, description: {}'.format(stage,e)
print(print_content)
if reprocess==0:
# no need to reprocess
#=================================
delete_message(message)
#=================================
exit()
else:
# reprocess is needed
messages_to_reprocess.append({"itemIdentifier": record['messageId']})
print('Adding failed file into reprogress response list')
subject=print_content
body=e
#notify(0,subject,body)
batch_failure_response["batchItemFailures"] = messages_to_reprocess
if len(messages_to_reprocess) ==0:
#means the list is empty
print('message consumed successfully.')
return batch_failure_response
else: #means there's message in the list
list_to_string=' '.join(map(str, messages_to_reprocess))
print('There''s error message :{}'.format(list_to_string))
print('returning failed file to sqs...')
return batch_failure_response
#==========================================================================
#==========================================================================
def check_remaining_files(bucket_name,target_name=None):
resp=s3_client.list_objects_v2(Bucket=bucket_name)
print("checking for remaining files:")
#print(resp)
unload_file=None
invalid_file=None
unload_table=None
countvalid=0
countinvalid=0
if 'Contents' in resp:
print("there are remaining files in s3.")
for OBJECT in resp['Contents']:
key=OBJECT['Key']
upload_date=OBJECT['LastModified']
upload_date=upload_date.astimezone(pytz.timezone(time_zone1))
#print("upload_date is {}".format(upload_date))
#change datetime from utc to est
# better to compare both time within 10 minutes
# there might be more than one loadings per day
now=datetime.now(time_zone)
now=now.astimezone(pytz.timezone(time_zone1))
#now=now.replace(tzinfo=pytz.utc)
#print("now in pytz is {}".format(now))
delta=now-upload_date
delta_in_minutes=(delta.total_seconds())/60
#print("delta in mimutes: {}".format(delta_in_minutes))
found_par="false"
if (key.endswith('txt') or key.endswith('csv')) and delta_in_minutes<time_interval_in_minutes:
#there're more flat files for today
if target_name!=None: # we need to check s3_key exists or not
# here we can't make target file name in lower case,
# s3_key is not converted to lower case
# and the (key) we get using this function is not converted either
if str(key)==target_name or target_name in str(key):
#means we found the target file
found_par="true"
return found_par
countvalid=countvalid+1
if countvalid==1: #means this is the first file found:
unload_file=str(key) # get the s3 file name first
#print('unload_file is {}'.format(unload_file))
unload_file=unload_file.lower()
respons=get_tablename(unload_file) # get the corresponding table name in mysql
# return {
#"schema_name":schema_name,
#"temp_tablename":temp_tablename,
#"correct_tablename":correct_tablename
#}
unload_table=respons['correct_tablename']
#print('unload_table is {}'.format(unload_table))
else:
unload_file=unload_file+','+str(key)
respons=get_tablename(unload_file)
unload_table=unload_table+','+respons['tablename']
else:
countinvalid=countinvalid+1
if countinvalid==1: #means this is the first file found:
invalid_file=str(key)
else:
invalid_file=invalid_file+','+str(key)
#===============================================================
#loop complete, we get unload list and invalid list
if invalid_file!=None:
print("Below {} files of {} are not for loading.".format(countinvalid,invalid_file))
else:
unload_file=None
unload_table=None
return{
"unloadfiles":unload_file,
"filenumber":countvalid,
"unloadtables":unload_table
}
#==========================================================================
def invoke_another_lambda(inputParams,invocationType,child_lambda=2):
# 1 --> reporting lambda
# 2 --> loading lambda (by default)
# 1 --> get response
# 2 --> async (parent lambda dont need to wait for child lambda's response )
if child_lambda ==1:
function_arn=reporting_arn
print("invoking reporting lambda")
else:
function_arn=loading_arn
print("invoking loading lambda")
if invocationType ==1:
type='RequestResponse'
else:
type='Event'
'''
# the project decided to use VPC ENDPOINT for security reason
# loading lambda in the VPC will fetch secret itself
# leader lambda in the public is not allowed to connect Secrets Manager
'''
response = lambda_client.invoke(
FunctionName = function_arn,
InvocationType = type,
Payload = json.dumps(inputParams)
)
responseFromChild = json.load(response['Payload'])
return responseFromChild
#==========================================================================
def split_file(source_bucket_name, source_s3_key,file_delimiter,file_size_in_mib,target_bucket_name=backup_bucket,target_foler=backup_folder):
resp = s3_client.get_object(Bucket=source_bucket_name, Key=source_s3_key)
data = pandas.read_csv(resp['Body'],sep=file_delimiter)
#data_for_loading =data[['Date','GEO','DGUID','Products','UOM','UOM_ID','SCALAR_FACTOR','SCALAR_ID','VECTOR','COORDINATE','VALUE','STATUS','SYMBOL','TERMINATED','DECIMALS']]
#data_for_loading.drop(0,axis=0,inplace=True)
# to drop the first row of headers
data.index=data.index+1
row_number=len(data.index)
# to make index starting from 1
file_name=os.path.basename(source_s3_key)
length=len(file_name)
s3_key_extension=file_name[length-3:]
# below is to split the large data files into smaller one
# Please remember there is no way to save any files in the upload s3 bucket
# as s3 bucket will trigger sqs that will trigger lambda
# this is an horible infinite loop
# the split files are all saved in another backup bucket
small_files_folder=finding_folder(target_bucket_name,target_foler,'{}/{}'.format(today,file_name))
csv_buffer=StringIO
lines_per_file=int(row_number*(5/file_size_in_mib))
ranges_number=math.ceil(row_number/lines_per_file)
print('total {} small files'.format(ranges_number) )
if row_number % lines_per_file > 0:
# means the last range is not the same large as the previous ranges
# which is normal
last_range_number=row_number % lines_per_file
# get the correct row number for the last range
else:
last_range_number=0
print ('the last range has {} rows'.format(last_range_number) )
def range1(start, end):
return range(start,end+1)
for splitnumber in range1(1,ranges_number):
print (splitnumber)
small_files_name=file_name.replace('.{}'.format(s3_key_extension),'_{}.{}'.format(s3_key_extension))
small_file_data=data.loc[((splitnumber-1)*lines_per_file+1):(splitnumber*lines_per_file)]
small_file_data.to_csv(csv_buffer,index=False)
# to save the data into backup folder
save_as='{}{}'.format(small_files_folder,small_files_name)
s3_session_resource.Object(target_bucket_name,save_as).put(Body=csv_buffer.getvalue())
csv_buffer.truncate(0)
# remember to truncate the buffer, as the buffer is still in loop
# it can't truncate itself for the next split file data
return {
"small_files_folder":small_files_folder
}
#==========================================================================
def finding_folder(bucket_name,folder_name,subfolder_name):
folder_combination='{}/{}/'.format(folder_name,subfolder_name)
folder_found=0
#folder_found=0 means neither folder nor subfolder found
#folder_found=1 means only folder found, need to create a subfolder
#folder_found=2 means both the folder and subfolder found
target_folder=None
#no path for target folder and subfolder yet
objects_in_bucket=s3_client.list_objects_v2(Bucket=bucket_name)
#as the bucket is empty at the first place, there wont be any content
if 'Contents' in objects_in_bucket:
for object in objects_in_bucket['Contents']:
if folder_name in object['Key']:
#folder found
substring=object['Key']
if substring.endswith('{}/'.format(folder_name)):
folder_alone=object['Key']
folder_path='{}{}/'.format(folder_alone,subfolder_name)
folder_found=1
#here to grab the backup folder without its subfolders
elif substring.endswith('{}/'.format(subfolder_name)):
print('subfolder found:'.format(object['Key']))
folder_found=2
target_folder=object['Key']
return target_folder
else:
folder_found=0
if folder_found==1:
#means folder found alone, missing today's subfolder
s3_client.put_object(Bucket=bucket_name, Key=folder_path)
print("today's subfolder created:{}".format(folder_path))
target_folder=folder_path
if folder_found==0:
s3_client.put_object(Bucket=bucket_name, Key=folder_combination)
print('Both of folder and subfolder created.'.format(folder_combination))
target_folder=folder_combination
return target_folder
# the result contains '/' in the end
#==========================================================================
def get_tablename(target_name):
schema_name=''
correct_tablename=''
temp_tablename=''
#we can add spelling corrector here if there's possibility of typos in filenames
if 'price' in target_name or 'index' in target_name:
#always be aware of lower cases
#make names to be in lower cases as soon as possible
#s3_key_withoutextension already be lower case
schema_name='here is your schema'
correct_tablename='0.PriceIndex'
# to name the file's temp table in mysql:
temp_tablename='{}_temp'.format(target_name)
#elif 'some_string' in target_name:
#more schemas and tables here
else:
#print ('The file name of {} is not valid. Please check if you uploaded the right file.'.format(s3_key))
#print('Sending email of notification via SNS....')
subject='The file name of {} is not valid. Please check if you uploaded the right file.'.format(target_name)
body=''
res = notify(0, subject, body)
exit()
# if no table matches, this file won't be loaded
return {
"schema_name":schema_name,
"temp_tablename":temp_tablename,
"correct_tablename":correct_tablename
}
#===============================================================================
def notify(status, subject, body):
#here status decides which topic to call:
if status==1:
#means everythin going well
sns_topic_arn=topic_arn_on_success
elif status==0:
sns_topic_arn=topic_arn_on_failure
#print("topic_arn is : {}".format(sns_topic_arn))
subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99]
body = str(body)
sns_client = boto3.client('sns')
response = sns_client.publish(
TargetArn=sns_topic_arn,
Subject=subject,
Message=body
#if we wish to send different messages to mobile and email
# we can set as below:
#Message = json.dumps({'default': subject}),
#MessageStructure='json'
)
return "message sent"
#==========================================================================
def backup_file(source_bucket_name,source_s3_key,backup_filename,backup_bucketname=backup_bucket,backup_foldername=backup_folder):
#define the source
print('starting back-up.')
copy_source={
'Bucket':source_bucket_name,
'Key':source_s3_key
}
#define the target
#to find the backup folder and today's subfolder,
#if not existing, create new ones
respon=finding_folder(backup_bucketname,backup_foldername,today)
#target_folder=respon
#that is the folder, the data file will be moved into.
#to create target file
backup_file='{}{}'.format(respon,backup_filename)
s3_client.put_object(Bucket=backup_bucketname, Key=backup_file)
#to move the data file to the target position for back up
bucket=s3_resource.Bucket(backup_bucketname)
bucket.copy(copy_source,backup_file)
s3_resource.Object(source_bucket_name,source_s3_key).delete()
print('file of {} has been moved to backup folder of {}'.format(source_s3_key,respon))
#=====================================================================================
#to delete sqs message after a successful loading
def delete_message(message):
#print("message for deletion: {}".format(message))
sqs_region, sqs_Account_Id, sqs_name = message['eventSourceARN'].split(':')[3:6]
sqs_client = boto3.client('sqs', region_name=sqs_region)
sqs_url = f'https://{sqs_region}.queue.amazonaws.com/{sqs_Account_Id}/{sqs_name}'
#sqs_client.delete_message(QueueUrl=sqs_url, ReceiptHandle=message['receiptHandle'])
#print('sqs message deleted')
#print("QueueUrl is {} , and ReceiptHandle is {}".format(sqs_url,message['receiptHandle']))
#==========================================================================
===========> Child Lambda for Loading — Python Scripts
from __future__ import print_function
import os
import json
import urllib
import time
from datetime import datetime
from datetime import date
import pytz
from pytz import timezone
import urllib.parse
from urllib.parse import unquote_plus
import unicodedata
### for read/split data files in s3 ###
import pandas
from io import StringIO
import math
### for MySQL connection ###
import sys
import logging
import pymysql
import json
### clients settings ###
import boto3
import botocore.session as bc
lambda_client=boto3.client('lambda')
s3_client=boto3.client('s3')
s3_resource=boto3.resource('s3')
session=boto3.Session()
s3_session_resource=session.resource('s3')
rds_data_client = boto3.client('rds-data') # this is for aurora only...
### Environment Variables ###
aws_region=os.getenv('aws_region')
Account_Id=os.getenv('Account_Id')
mysql_database=os.getenv('mysql_database')
mysql_host=os.getenv('mysql_host')
topic_name_on_success=os.getenv('topic_name_on_success')
topic_arn_on_success=os.getenv('topic_arn_on_success')
topic_name_on_failure=os.getenv('topic_name_on_failure')
topic_arn_on_failure=os.getenv('topic_arn_on_failure')
secret_name=os.getenv('secret_name')
reporting_arn=os.getenv('reporting_arn')
#lambda will not use master user to login
# it will use another user dedicated for its job
#ses_sender_email=os.getenv('ses_sender_email')
#ses_receiver_email=os.getenv('ses_receiver_email')
#below for backup folder
backup_bucket=os.getenv('backup_bucket')
backup_folder="backup"
report_source_data_folder="reports/priceindex"
### time_zone settings ###
time_zone=timezone('EST')
time_zone1="America/New_York"
#print("now is {}".format(datetime.now(time_zone).date()))
today = datetime.now(time_zone).date()
#this can make sure we use est timezone
time_stamp=time.time()
# get time_stamp 1689475413.9474
### timeout settings ###
time_interval_in_minutes=3600 # to load the file if it is uploaded within 30 minutes
lambda_timeout_in_seconds=60
maxerrors_allowed=5
### report building ###
report_file_prepare=False
#once report files were loaded completely,
#report lambda will be invoked
#dont need to wait until all files were loaded
# create db connection outside of lambda_handler
# so that to reuse one connection
#print('fetching secret...')
session=boto3.session.Session()
session_client=session.client(
service_name='secretsmanager',
region_name=aws_region
)
secret_response=session_client.get_secret_value(
SecretId=secret_name
)
secret_arn=secret_response['ARN']
secretstring=secret_response['SecretString']
secret_json=json.loads(secretstring)
user_name=secret_json['username']
pass_word=secret_json['password']
mysql_host_name=mysql_host[0:-5]
#print(mysql_host_name)
try:
conn = pymysql.connect(host=mysql_host_name, user=user_name, password=pass_word, db=mysql_database, connect_timeout=10)
except pymysql.MySQLError as e:
print("ERROR: Unexpected error: Could not connect to MySQL instance.")
print(e)
sys.exit()
#print("SUCCESS: Connection to RDS MySQL instance succeeded")
def lambda_handler(event, context):
stage =0
loading_status=0
reload=False # reload if there is successful loading within a fixed time
report1=0 # reporting process is not started by default
report2=False # if there is a second series of reports
# after the loading lambda is invoked by leader lambda
# let loading lambda to check if the file exists.
# the reason to do so is that when SQS put the message in the queue, it does not check
# if the file exists in s3 upload bucket, the file might be processed and removed to backup s3 bucket
# by another lambda, but the message remains in the queue.
report_data_prepare=event['report_data_prepare']
reporting_status=0
try:
if report_data_prepare==0:
#means loading lambda needs to do Data ETL, not reporting
s3_bucket=event['s3_bucket']
s3_key=event['s3_key']
file_split=event['file_split']
small_files_folder=event['small_files_folder']
# backup/20230101/priceindex.csv/
schema_name=event['schema_name']
temp_tablename=event['temp_tablename']
tablename=event['tablename']
file_delimiter=event['file_delimiter']
res=check_remaining_files(s3_bucket,s3_key)
if res=="true":
#means s3_key exists
print ('The file of {} exists. Start loading.'.format(s3_key))
else:
print_content="There is no such file {}. It might be processed and moved to backup by another lambda.".format(s3_key)
print(print_content)
loading_status=2
return {
"loading_status":2,
"error":print_content
}
# start loading
# 1 to find out the source data file is splitted or not
#print('file_split is {}'.format(file_split))
if file_split==0:
# the file is small than 6 mib
# loading from upload bucket
file=s3_resource.Object(s3_bucket,s3_key)
filedata=file.get()['Body'].read()
row_number=filedata.decode('ISO-8859-1').count('\n')-1
#print('row_number is {}'.format(row_number))
total_rows=row_number
#get the total rows in the original data file
# I used several ways to avoid duplicate or incomplete loading:
#1. to upsert instead of insert
#2. to check if there is already a successful loading within 30 minutes
#3. to create log tables in MySQL to record the number rows affected by loading
#4. to count the row number in temp table before loading
#5. to count the row number in data file in s3 using lambda
#6. to compare both numbers (#4 and #5) and decide to load or not
# in fact, we don't need to apply all above steps
# I created the template for every step for reference only
##### AAA) check status before loading ######
stage=1
parameter_for_result='@checking_status'
usp_name="{}.sp_lambda_loading_check_status('{}','{}',{},{})".format(schema_name,schema_name,tablename,time_interval_in_minutes*60,parameter_for_result)
sql_text="call {};".format(usp_name)
#sql_reset="set {} ='';".format(parameter_for_result)
# no need to reset the OUT parameters as SP will set it as -1 if error stops SP running
sql_check_text='select 1;'
run_sql(sql_text,sql_check_text,conn,1)
# the parameter of '1' means no need to wait for query to finish
# as this procedure usually runs very fast
sql_text="select {};".format(parameter_for_result)
result=run_sql_to_get_result(sql_text,conn)
if result ==-1:
#means error when running SP itself
#ATTENTION: even error when checking loading status
# there are more measures loading lambda will take to ensure that the loading is not duplicate
# here, we can send email via SNS to person in charge , no need to stop and exit lambda
print_content="error occurred when executing sp_lambda_loading_check_status. Please check log_for_loading in MySQL."
print(print_content)
subject=print_content
body=''
#res = notify(0, subject, body)
if result==0 :
#means don't load, there was a successful loading not long ago
#exit lambda function
print_content="there is a successful loading less than 30 minutes ago. Loading paused. Please check."
print(print_content)
loading_status=0
return {
"loading_status":loading_status,
"error":print_content
}
if result==1 :
#means continue to load
print("After checking, continue to load...")
#below is to call SP in MySQL
#1 to create a temp table
# check for returning result from SP
#2 ! to load data into this temp table
#3 to check the number of rows loaded
#4 to compare with total_rows to see if there is any difference
#5 if no, it means all data in the data file were loaded into temp table
#6 to upsert data into permanent table from temp table
#7 to write log for loading
#====================================================================
##### BBB) create temp table ######
stage=2
parameter_for_result='@temp_table_created'
usp_name="{}.sp_loading_create_temp_table('{}','{}','{}',{})".format(schema_name,schema_name,temp_tablename,tablename,parameter_for_result)
sql_text="call {};".format(usp_name)
sql_check_text="select Status from log_for_loading where locate('temp table creation',EventSource)>0 and timediff(now(),Time_stamp)<10;"
# trying to get updated log info from log table
# once Procedure writes log, no matter status is success or not,
# means the procedure complete running
run_sql(sql_text,sql_check_text,conn)
sql_text="select {};".format(parameter_for_result)
result=run_sql_to_get_result(sql_text,conn)
if result != 1 :
#means tempt table creation failed
print_content="temp table of {} can't be created.".format(temp_tablename)
print(print_content)
sql_text='select Notes from {}.log_for_loading where Tablename="{}" and timediff(now(),Time_stamp)<10;'.format(schema_name,temp_tablename)
result=run_sql_to_get_result(sql_text,conn)
print(result)
loading_status=0
return {
"loading_status":loading_status,
"error":print_content
}
else:
# temp tables created successfully
# or sp is still running (which is rare, but possible)
# lambda continues
#====================================================================
##### CCC) loading from data file to temp table using lambda ######
stage=3
load_data_table_mysql(s3_bucket, s3_key,file_delimiter,schema_name,temp_tablename,conn)
sql_text='select count(*) from {}.`{}`;'.format(schema_name,temp_tablename)
count_temp=run_sql_to_get_result(sql_text,conn)
print('after couting rows in temp table, result. is {}'.format(count_temp))
total_rows=read_data_from_s3(s3_bucket,s3_key,file_delimiter,temp_tablename)
#====================================================================
##### DDD) loading to general table and split into sub tables as well ######
stage=4
parameter_for_result='@data_loading_splitting'
usp_name="{}.sp_loading_PriceIndex('{}','{}','{}','{}',{},{},{})".format(schema_name,schema_name,temp_tablename,tablename,s3_key,total_rows, maxerrors_allowed, parameter_for_result)
sql_text="call {};".format(usp_name)
sql_check_text="select status from log_for_loading where (locate('loading',EventSource)>0 or locate('splitting',EventSource)>0)and timediff(now(),Time_stamp)<10;"
# trying to get updated log info from log table
# as this procedure may have up to 3 different rows in column eventsource
# once Procedure writes log, no matter status is success or not,
# means the procedure complete running
run_sql(sql_text,sql_check_text,conn)
sql_text="select {};".format(parameter_for_result)
current_loadingstatus=run_sql_to_get_result(sql_text,conn)
# -1 --> error when running sp
# 0 --> loading failed
# 1--> loading successful
if current_loadingstatus==0 or current_loadingstatus==-1:
loading_status=0
# means loading status is failure
# or not finised yet (rare, but possible)
# if we dont have a precise answer to loading status
# we can't move the data file to another s3 bucket for back up
# the solution is to let another lambda to reprocess the message later
# after trying several times, if still can't get a result from this sp
# the message will go to dead letter queue which is monitored CloudWatch Alarm
print_content="error occurred during data loading and splitting for table of {}.".format(tablename)
print(print_content)
return {
"loading_status":loading_status,
"error":print_content
}
else:
loading_status=1
sql_text="select count(*) from {}.`{}`;".format(schema_name,tablename)
result=run_sql_to_get_result(sql_text,conn)
print('Rows loaded this time: {}. Rows in total: {}'.format(total_rows,result))
sql_text="select Date from {}.`{}` LIMIT 1;".format(schema_name,tablename)
result=run_sql_to_get_result(sql_text,conn)
print('Date format in table of {} : {}'.format(tablename,result))
# the date format is essential for later data processing
#1995-01 "year-month"
# it decides how to extract year and month from the string in sp in mysql
# means loading is successful
# return loading_status as 1
print('loading complete! Returning good news to leader lambda~')
loading_status=1
return {
"loading_status":loading_status,
"error":''
}
elif report_data_prepare==1:
# means loading lambda needs to execute stored procedures
# to get source data for reports
schema_name=event['schema_name']
tablename=event['tablename']
stage=5
print('stage is {}.Preparing data for reports'.format(stage))
##aaa) to create table for reporting first
# reporting procedure only updates the log table when error
# so i won't check for log table while mysql is executing the SP
report_table_name='1.report'
parameter_for_result='@table_for_reporting'
usp_name="{}.sp_reporting_50_general_table_for_report_building('{}','{}','{}',{})".format(schema_name,schema_name,tablename,report_table_name,parameter_for_result)
sql_text="call {};".format(usp_name)
sql_check_text="select ProcedureName from {}.log_for_reporting where locate('sp_reporting_50_general_table_for_report_building',ProcedureName)>0 and timediff(now(),Time_stamp)<10;".format(schema_name)
# trying to get updated log info from log table
# once log was added, sp finished running no matter successfully or not
# sql 5.7 and 8.0 use different syntax to rename columns
# change to the right syntax otherwise error throws
run_sql(sql_text,sql_check_text,conn)
sql_text="select {};".format(parameter_for_result)
result=run_sql_to_get_result(sql_text,conn)
if result ==1:
sql_text="select count(*) from {}.`{}`;".format(schema_name,report_table_name)
result=run_sql_to_get_result(sql_text,conn)
print_content='table {} for reporting is ready. Row number is {}'.format(report_table_name,result)
print(print_content)
else:
print_content='table for reporting can''t be created.'
print(print_content)
sql_text="select Notes from {}.log_for_reporting where ProcedureName='sp_reporting_50_general_table_for_report_building' and timediff(now(),Time_stamp)<10;".format(schema_name)
result=run_sql_to_get_result(sql_text,conn)
print(result)
subject=print_content
body=''
notify(0,subject,body)
#reporting_status=0
#return {
#"reporting_status":reporting_status,
#"error":print_content
#}
exit()
#bbb) now to create report data finally
parameter_for_result='@report_data_created'
parameter_delimiter=','
# this delimiter what usp user uses to separate multiple values for their input parameters
year='1995,1996'
month='12'
geo_limit='Canada'
category='food'
report_source_data='priceindex_sourcedata'
# source data will be saved in table of 'price_index_sourcedata'
usp_name="{}.sp_reporting_1_price_by_year_month_geo_category('{}','{}','{}','{}','{}','{}','{}','{}',{})".format(schema_name,schema_name,report_table_name,parameter_delimiter,year,month,geo_limit,category,report_source_data,parameter_for_result)
sql_text="call {};".format(usp_name)
sql_check_text="select ProcedureName from {}.log_for_reporting where locate('sp_reporting_1_price_by_year_month_geo_category',ProcedureName)>0 and timediff(now(),Time_stamp)<10;".format(schema_name)
run_sql(sql_text,sql_check_text,conn)
sql_text="select {};".format(parameter_for_result)
result=run_sql_to_get_result(sql_text,conn)
if result !='0':
#reporting_status=1
sql_text="select count(*) from {}.`{}`;".format(schema_name,report_source_data)
#sql_text="select Notes from {}.log_for_reporting where locate('sp_reporting_1_price_by_year_month_geo_category',ProcedureName)>0 and timediff(now(),Time_stamp)<10;".format(schema_name)
result=run_sql_to_get_result(sql_text,conn)
print_content='report data is generated and saved in the table of {}, row number is {}.'.format(report_source_data,result)
print(print_content)
# the result will be a number tell us how many rows the report data have
stage=6
respon=finding_folder(backup_bucket,report_source_data_folder,today)
report_source_data_directory=respon[0:-1]
# to delete the trailing '/' from the complete report_source_data_folder address
reporting_data_created=1
return {
"reporting_data_created":reporting_data_created,
"report_source_data":report_source_data,
"report_source_data_folder":report_source_data_directory
}
else:
print_content='there is no report data generated. Please check.'
print(print_content)
sql_check_text="select Notes from {}.log_for_reporting where locate('sp_reporting_1_price_by_year_month_geo_category',ProcedureName)>0 and timediff(now(),Time_stamp)<10;".format(schema_name)
result=run_sql_to_get_result(sql_check_text,conn)
print(result)
subject=print_content
body=''
#notify(0,subject,body)
#reporting_status=0
reporting_data_created=0
return {
"reporting_data_created":reporting_data_created,
"error":print_content
}
if file_split==1:
# the file is splitted and saved in backup bucket
# go to backup bucket for loading
s3_bucket=backup_bucket
# as this project, each data file does not exceed 6 mib
# this part is omitted
# the loading process is exactly like the above when there is no splitted small files
# except that we include all files in the small_file_folder in a loop and load each file in order
except Exception as e:
if stage==0 :
print_content='stage={}, error when connecting to MySQL, description: {}'.format(stage,e)
loading_status=0
if stage ==1:
print_content='stage={}, can''t check table previous loading status, description: {}'.format(stage,e)
loading_status=0
if stage ==2:
print_content='stage={}, error when building temp table, description: {}'.format(stage,e)
loading_status=0
if stage ==3:
print_content='stage={}, error when loading to temp table, description: {}'.format(stage,e)
loading_status=0
if stage ==4:
print_content='stage={}, error when loading to general and sub category tables, description: {}'.format(stage,e)
loading_status=0
if stage==5:
print_content='stage={}, loading complete, reporting failed, description: {}'.format(stage,e)
loading_status=1
if stage==6:
print_content='stage={}, report data is ready, error when invoking reporting lambda, description: {}'.format(stage,e)
loading_status=1
print(print_content)
return {
"loading_status":loading_status,
"error":print_content
}
#==========================================================================
#==========================================================================
def invoke_another_lambda(inputParams,invocationType,child_arn=reporting_arn):
# 1 --> reporting lambda
# 2 --> loading lambda (by default)
# 1 --> get response
# 2 --> async (parent lambda dont need to wait for child lambda's response )
if invocationType ==1:
type='RequestResponse'
else:
type='Event'
response = lambda_client.invoke(
FunctionName = child_arn,
InvocationType = type,
Payload = json.dumps(inputParams)
)
responseFromChild = json.load(response['Payload'])
return responseFromChild
#======================================================================================
# Read CSV file content from S3 bucket (this function is just for reference)
'''
def read_data_from_s3(bucket_name,target_name):
resp = s3_client.get_object(Bucket=bucket_name, Key=target_name)
data = resp['Body'].read().decode('utf-8')
data = data.split("\n")
columns=data.split("\n")[0].split(',')
# the utf-8 might not be enough to cover all decode types and
# if error, try decode('ISO-8859-1')
return data
'''
#=======================================================================
# Read CSV file content from S3 bucket (this function is applied)
def read_data_from_s3(bucket_name,target_name,file_delimiter,temp_tablename):
resp = s3_client.get_object(Bucket=bucket_name, Key=target_name)
data = pandas.read_csv(resp['Body'],sep=file_delimiter)
temp_tablename=temp_tablename.lower()
if 'priceindex' in temp_tablename:
data_for_loading =data[['Date','GEO','DGUID','Products','UOM','UOM_ID','SCALAR_FACTOR','SCALAR_ID','VECTOR','COORDINATE','VALUE','STATUS','SYMBOL','TERMINATED','DECIMALS']]
#data_for_loading.drop(0,axis=0,inplace=True)
# to drop the first row of headers
data_for_loading.index=data_for_loading.index+1
row_number=len(data_for_loading.index)
# to make index starting from 1
return row_number
#=====================================================================================
def load_data_table_mysql(bucket_name, target_name,file_delimiter,schema_name,temp_tablename,connection):
# if we wish to modify and select some columns from the s3 file for data loading,
# boto3 doesnot provide methods for us to do so
# data we get in a wholistic view can't be selected based on columns
# unless we extract columns from it and analyse each column to decide if its the one we are going
# to load into database, and then extract each row and omit the values we dont wish for,
# recompose the other values into a new row(string)
# and then insert it into db...
# other choices are to 1) use AWS service s3-select, which is not free
# or 2) we need to make sure the structure of data file the same as table structure in mysql
# otherwise, error will happen during loading
# or 3) to use pandas in lambda function, this great package would solve most of problems for
# data reading
resp = s3_client.get_object(Bucket=bucket_name, Key=target_name)
data = pandas.read_csv(resp['Body'],delimiter=file_delimiter,index_col=False)
# i tried to find a simple way to read from resp["Body"] and fetch each row and load into MySQL
# however, if there is comma within the value itself, even the value is quoted
# the simple way still break one value into two when loading
# this will cause the error of exceeding column numbers
# pandas is a more secure way to load data into database line by line
temp_tablename=temp_tablename.lower()
# covert all into lower case before searching
if 'priceindex' in temp_tablename:
# sometimes, the orginal data files change its order of columns or add more columns
# we need to make sure the columns we get from data files are in the right order and right number
column_list=['Date','GEO','DGUID','Products','UOM','UOM_ID','SCALAR_FACTOR','SCALAR_ID','VECTOR','COORDINATE','VALUE','STATUS','SYMBOL','TERMINATED','DECIMALS']
data_for_loading=data[column_list]
#print(data_for_loading)
# the above can get "useful and right" columns for the tables in MySQL
# loop through the data frame
row_index=1
# to exclude index column in dataframe, starting from 1
value_string_total=''
for row_index,row in data_for_loading.iterrows():
#here %S can't be used with MySQL
# so "insert into table values(%s,%s,...) is not working"
value_string_single=''
for column_index in range(len(column_list)):
column_name=column_list[column_index]
value_string_single=value_string_single+str(data_for_loading[column_name].values[row_index])+"','"
value_string_single=value_string_single[0:-3]
value_string_single=" ('"+value_string_single+"'), "
value_string_total=value_string_total+value_string_single
# after looping all rows in data file:
value_string_total=value_string_total[0:-2]
# the data entry for RDS MySQL, must go in this way
# otherwise, the slow performance would cause table lock
# when one query is trying to insert data while the previous
# query has not finished yet
# we have no way but to combine all rows as together and then to load into db
try:
with connection.cursor() as cursor:
sql = "INSERT INTO {}.{} Values {};".format(schema_name, temp_tablename,value_string_total)
#print(sql)
cursor.execute(sql)
#cursor.execute(sql, tuple(row))
connection.commit()
except Exception as e:
print("error when loading data into temp table. Error is :{}".format(e))
# do nothing when error, just print out the errors
# here, try to load all rows into mysql,
# will check later in the codes if the number of error rows are acceptable
if connection:
connection.commit()
return
#=====================================================================================
def check_remaining_files(bucket_name,target_name=None):
resp=s3_client.list_objects_v2(Bucket=bucket_name)
print("checking for remaining files:")
#print(resp)
unload_file=None
invalid_file=None
unload_table=None
countvalid=0
countinvalid=0
if 'Contents' in resp:
print("there are remaining files in s3.")
for OBJECT in resp['Contents']:
key=OBJECT['Key']
upload_date=OBJECT['LastModified']
upload_date=upload_date.astimezone(pytz.timezone(time_zone1))
#print("upload_date is {}".format(upload_date))
#change datetime from utc to est
# better to compare both time within 10 minutes
# there might be more than one loadings per day
now=datetime.now(time_zone)
now=now.astimezone(pytz.timezone(time_zone1))
#now=now.replace(tzinfo=pytz.utc)
#print("now in pytz is {}".format(now))
delta=now-upload_date
delta_in_minutes=(delta.total_seconds())/60
#print("delta in mimutes: {}".format(delta_in_minutes))
found_par="false"
if (key.endswith('txt') or key.endswith('csv')) and delta_in_minutes<time_interval_in_minutes:
#there're more flat files for today
if target_name!=None: # we need to check s3_key exists or not
# here we can't make target file name in lower case,
# s3_key is not converted to lower case
# and the (key) we get using this function is not converted either
if str(key)==target_name or target_name in str(key):
#means we found the target file
found_par="true"
return found_par
countvalid=countvalid+1
if countvalid==1: #means this is the first file found:
unload_file=str(key) # get the s3 file name first
#print('unload_file is {}'.format(unload_file))
unload_file=unload_file.lower()
respons=get_tablename(unload_file) # get the corresponding table name in mysql
# return {
#"schema_name":schema_name,
#"temp_tablename":temp_tablename,
#"correct_tablename":correct_tablename
#}
unload_table=respons['correct_tablename']
#print('unload_table is {}'.format(unload_table))
else:
unload_file=unload_file+','+str(key)
respons=get_tablename(unload_file)
unload_table=unload_table+','+respons['tablename']
else:
countinvalid=countinvalid+1
if countinvalid==1: #means this is the first file found:
invalid_file=str(key)
else:
invalid_file=invalid_file+','+str(key)
#===============================================================
#loop complete, we get unload list and invalid list
if invalid_file!=None:
print("Below {} files of {} are not for loading.".format(countinvalid,invalid_file))
else:
unload_file=None
unload_table=None
return{
"unloadfiles":unload_file,
"filenumber":countvalid,
"unloadtables":unload_table
}
#==========================================================================
def get_tablename(target_name):
schema_name=''
correct_tablename=''
temp_tablename=''
#we can add spelling corrector here if there's possibility of typos in filenames
if 'price' in target_name or 'index' in target_name:
#always be aware of lower cases
#make names to be in lower cases as soon as possible
#s3_key_withoutextension already be lower case
#means the tables schema of your_schema
schema_name='your_schema'
correct_tablename='0.PriceIndex'
# to name the file's temp table in mysql:
temp_tablename='{}_temp'.format(target_name)
#elif 'some_string' in target_name:
#more schemas and tables here
else:
#print ('The file name of {} is not valid. Please check if you uploaded the right file.'.format(s3_key))
#print('Sending email of notification via SNS....')
subject='The file name of {} is not valid. Please check if you uploaded the right file.'.format(target_name)
body=''
res = notify(0, subject, body)
exit()
return {
"schema_name":schema_name,
"temp_tablename":temp_tablename,
"correct_tablename":correct_tablename
}
#===============================================================================
def notify(status, subject, body):
#here status decides which topic to call:
if status==1:
#means everythin going well
sns_topic_arn=topic_arn_on_success
elif status==0:
sns_topic_arn=topic_arn_on_failure
#print("topic_arn is : {}".format(sns_topic_arn))
subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99]
body = str(body)
sns_client = boto3.client('sns')
response = sns_client.publish(
TargetArn=sns_topic_arn,
Subject=subject,
Message=body
#if we wish to send different messages to mobile and email
# we can set as below:
#Message = json.dumps({'default': subject}),
#MessageStructure='json'
)
return "message sent"
#==========================================================================
def run_sql(sql_text, sql_check_text,connection,report_file_prepare=False):
if report_file_prepare==True:
MAX_WAIT_CYCLES=1
# if call sp to build reports,
# there is no need to wait to get a 'complete' status for executing sql statement
else:
MAX_WAIT_CYCLES = 20
# if a sql statement can't finish within 20 time cycles
# lamda continue to run the next line
# the reason for waiting is that if we don't wait for this procedure (aaa) to finish
# and we run the next SP (bbb), we may not be able to get any correct result
# because (aaa) hasn't updated the status in log table yet.
attempts = 0
print("Executing: {}".format(sql_text))
with connection.cursor() as cur:
cur.execute(sql_text)
#MySQL doesnot have boto3 to describe a sql statement executing status
# we need to check log table instead
while attempts < MAX_WAIT_CYCLES:
attempts += 1
time.sleep(2)
#20cycles * 2 seconds=40s
# lambda can wait up to 40s
# but if there is result before 40s
# lambda will break the loop and run the next line
status = status_check(sql_check_text,conn)
if status ==1:
#means log shows success
# lambda can continue
print("status is: completed".format(status))
break
#else: lambda won't wait any more
#
connection.commit()
return
#==========================================================================
def status_check(sql_check_text,connection):
with connection.cursor() as cur:
cur.execute(sql_check_text)
#record = cur.fetchone()[0]
# if cur is empty, can't get value for variable 'record'
if cur.rowcount>0:
#means log table is updated
status=1
else:
#means error occurred
status=0
connection.commit()
return status
#==========================================================================
def run_sql_to_get_result(sql_text,connection,report_file_prepare=False):
if report_file_prepare==True:
MAX_WAIT_CYCLES=1
# if call sp to build reports,
# there is no need to wait to get a 'complete' status for executing sql statement
else:
MAX_WAIT_CYCLES = 20
# if a sql statement can't finish within 20 time cycles
# lamda continue to run the next line
# the reason for waiting is that if we don't wait for this procedure (aaa) to finish
# and we run the next SP (bbb), we may not be able to get any correct result
# because (aaa) hasn't updated the status in log table yet.
attempts = 0
print("Executing: {}".format(sql_text))
record=-1
while attempts < MAX_WAIT_CYCLES:
attempts += 1
time.sleep(1)
with connection.cursor() as cur:
cur.execute(sql_text)
if cur.rowcount>0:
break
record = cur.fetchone()[0]
print('after checking {}, result is {}'.format(sql_text,record))
connection.commit()
return record
#==========================================================================
def finding_folder(bucket_name,folder_name,subfolder_name):
folder_combination='{}/{}/'.format(folder_name,subfolder_name)
folder_found=0
#folder_found=0 means neither folder nor subfolder found
#folder_found=1 means only folder found, need to create a subfolder
#folder_found=2 means both the folder and subfolder found
target_folder=None
#no path for target folder and subfolder yet
objects_in_bucket=s3_client.list_objects_v2(Bucket=bucket_name)
#as the bucket is empty at the first place, there wont be any content
if 'Contents' in objects_in_bucket:
for object in objects_in_bucket['Contents']:
if folder_name in object['Key']:
#folder found
substring=object['Key']
if substring.endswith('{}/'.format(folder_name)):
folder_alone=object['Key']
folder_path='{}{}/'.format(folder_alone,subfolder_name)
folder_found=1
#here to grab the backup folder without its subfolders
elif substring.endswith('{}/'.format(subfolder_name)):
print('subfolder found:'.format(object['Key']))
folder_found=2
target_folder=object['Key']
return target_folder
else:
folder_found=0
if folder_found==1:
#means folder found alone, missing today's subfolder
s3_client.put_object(Bucket=bucket_name, Key=folder_path)
print("today's subfolder created:{}".format(folder_path))
target_folder=folder_path
if folder_found==0:
s3_client.put_object(Bucket=bucket_name, Key=folder_combination)
print('Both of folder and subfolder created.'.format(folder_combination))
target_folder=folder_combination
return target_folder
2. AWS DynamoDB
Let’s see how Lambda works with DynamoDB.
AWS DynamoDB has an amazing performance. If the project is to deal with huge amount of data in realtime, I recommend to replace Lambda with Amazon Kinesis.
Loading Lambda:
1) to validate s3 event, s3 key
2) to check the file and split the file if it exceeds the limit
3) to convert the file and add new attribute as PK
4) to delete the dynamodb table if exists
5) to use 'import_table' feature to load data from s3 to dynamodb
6) to check loading status
7) to backup the data file in another s3 bucket
8) return failure message to SQS if any
====================> Loading Lambda for DynamoDB
#### NOTE #####
# The logs may show below error:
# "Unable to map value based on the given schema. Remainder of the file will not be processed."
# this is caused by uploading data file using AWS S3 console
# The import will be successful in spite of this error.
# Uploading data file using CLI will solve the problem.
#======================================
##### no analytic query is allowed in dynamodb
##### we need to export raw data to s3
##### and use GLUE and Athena for ETL and reports
##### this is not an ideal reporting process for my project
##### this file is just for reference
from __future__ import print_function
import json
import urllib
import time
from datetime import datetime
from datetime import date
import pytz
from pytz import timezone
import urllib.parse
from urllib.parse import unquote_plus
import unicodedata
### clients settings ###
import boto3
import botocore.session as bc
s3_client=boto3.client('s3')
s3_resource=boto3.resource('s3')
session=boto3.Session()
s3_session_resource=session.resource('s3')
dynamodb_client = boto3.client('dynamodb')
import os
aws_region=os.getenv('aws_region')
Account_Id=os.getenv('Account_Id')
topic_name_on_success=os.getenv('topic_name_on_success')
topic_arn_on_success=os.getenv('topic_arn_on_success')
topic_name_on_failure=os.getenv('topic_name_on_failure')
topic_arn_on_failure=os.getenv('topic_arn_on_failure')
#below for backup folder
backup_bucket=os.getenv('backup_bucket')
backup_folder="backup"
### for read/split data files in s3 ###
import pandas
from io import StringIO
import math
### time_zone settings ###
time_zone=timezone('EST')
time_zone1="America/New_York"
#print("now is {}".format(datetime.now(time_zone).date()))
today = datetime.now(time_zone).date()
#this can make sure we use est timezone
### timeout settings ###
time_interval_in_minutes=30 # to load the file if it is uploaded within 30 minutes
lambda_timeout_in_seconds=60
maxerrors_allowed=5
def lambda_handler(event, context):
stage =0
file_split=0 # not to split the file, unless it exceeds 6mb
small_files_folder=''
loading_status=0
if event:
#print(event)
message=event['Records'][0]
messages_to_reprocess =[]
# this is a long story, lambda is concurrent and it can't solve the problem of dead lock itself
# this is why we apply SQS to help reprocess those files who are considered to be dead lock
# We set batchsize for the queue to be more than 1, for example, 5. if lambda reports to SQS
# that only one of the five messages failed to process, the other four files are successfully consumed
# SQS will treat the whole queue to be a failure. after the invisibility timeout, these five messages
# will be reput to the queue again....
# Fortunately, mapping event has a parameter called 'partial batch response'. it enables lambda to only report
# with failed messages to SQS. and SQS only reprocess these failed ones.
# But according to my experience, setting up this parameter is far not enough. We need to explicitly tell lambda in codes,
# which message failed and should be returned to SQS.
# The above message_to_reprocess is to collect any failure message.
batch_failure_response={}
for record in event["Records"]:
try:
body = record["body"]
body=json.loads(body)
s3_event=body['Records'][0]
event_name=s3_event['eventName']
print("event_name is {}".format(event_name))
if "ObjectCreated:" not in event_name:
print('event is not s3:create')
s3_bucket=s3_event['s3']['bucket']['name']
#print("s3_bucket is : {}".format(s3_bucket))
#=================================
delete_message(message)
#=================================
exit()
#do nothing as this is not a valid event
else:
print("#s3_event is:")
print(s3_event)
s3_bucket=s3_event['s3']['bucket']['name']
#print("s3_bucket is : {}".format(s3_bucket))
s3_key=s3_event['s3']['object']['key']
s3_key=unquote_plus(s3_key)
#to eliminate the '+' sign in the folder name
# this is essential if we try to search for any keys in s3
# but in some situations like cloudfront client, we need that plus sign
print("s3_key is : {}".format(s3_key))
s3_key_size=s3_event['s3']['object']['size']
file_size_in_mib=s3_key_size/(1024*1024)
#get the data file size
print(f"file_size is {s3_key_size/(1024*1024)} MB")
#============================================================
#get the pure file with/without extension
#get the pure file with lower cases only
path_name=os.path.dirname(s3_key)
print("path_name is {}".format(path_name))
# this path_name will be used later in import_table method
file_name=os.path.basename(s3_key)
length=len(file_name)
s3_key_extension=file_name[length-3:]
s3_key_extension=s3_key_extension.lower()
s3_key_withoutextension=file_name[:length-4]
s3_key_withoutextension=s3_key_withoutextension.lower()
from_path = "s3://{}/{}".format(s3_bucket, s3_key)
#create file path for data file in s3
#print("URI of s3_key is {}".format(from_path))
if 'upload' in s3_bucket and (s3_key_extension == "txt" or s3_key_extension == "csv") and ('converted' not in s3_key_withoutextension) :
#if the uploaded file is a flat file
#print('the file can be processed further')
### attention:
if s3_key_extension=='csv':
file_delimiter=','
elif s3_key_extension=='txt':
file_delimiter=','
# count row_number
file=s3_resource.Object(s3_bucket,s3_key)
filedata=file.get()['Body'].read()
row_number=filedata.decode('ISO-8859-1').count('\n')
print('row_number is {}'.format(row_number))
resp=get_tablename(s3_key_withoutextension)
tablename=resp['correct_tablename']
if tablename=='':
#means can't find the target table in dynamodb
#=================================
delete_message(message)
#=================================
exit()
elif tablename=="error":
#means table already exists and can't be deleted for new loading
# return message to sqs
messages_to_reprocess.append({"itemIdentifier": record['messageId']})
print('Adding failed file into reprogress response list')
else:
#print('The file of {} is ready to be loaded to Dynamodb'.format(s3_key))
stage =1
#============================================================
# if the size is more than 6mb, will split it into smaller ones
if file_size_in_mib>5.6:
file_split=1
response=split_file(s3_bucket,s3_key,file_delimiter,file_size_in_mib)
small_files_folder=response['small_files_folder']
stage=2
# use 'import_table' feature:
######
# as 3 attributes of GEO, Date, Products combines to be PK
# in dynamodb, the PK attributes can't exceed 2
# To solve the problem, either combine 2 attributes to 1
# or add unique id attribute to be pk
# this lambda function uses the second solution
# before import table, read data file in pandas frame
# save data with index column
# replace the original file with newly saved file
# import saved file to dynamodb, index column becomes PK
######
response=convert_file_with_index(s3_bucket,s3_key,path_name, file_name,file_delimiter,s3_key_withoutextension)
#return {
#"converted_file":converted_file,
#"converted_pathname":converted_pathname,
#"target_bucket":target_bucket
#}
converted_s3_key=response['converted_file']
converted_path_name=response['converted_pathname']
converted_s3_bucket=response['target_bucket']
print(response)
# data in converted_s3_key will be imported to dynamodb
# note: attribute definitions can only contain columns that are key attributes in
# base table and GSI, LSI
dynamodb_table={
'TableName': tablename,
'AttributeDefinitions': [
{
'AttributeName': 'AutoID',
'AttributeType': 'N'
},
{
'AttributeName': 'Date',
'AttributeType': 'S'
},
{
'AttributeName': 'GEO',
'AttributeType': 'S'
}
],
'KeySchema': [
{
'AttributeName': 'AutoID',
'KeyType': 'HASH'
}
],
'BillingMode': 'PROVISIONED',
'ProvisionedThroughput': {
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 10
},
'GlobalSecondaryIndexes': [
{
'IndexName': 'priceindex_gsi',
'KeySchema': [
{
'AttributeName': 'Date',
'KeyType': 'HASH'
},
{
'AttributeName': 'GEO',
'KeyType': 'RANGE'
},
],
'Projection': {
'ProjectionType': 'INCLUDE',
'NonKeyAttributes': [
'Products',
'VALUE',
'STATUS'
]
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 10
}
},
]
}
response=import_table_dynamodb(converted_s3_bucket,converted_path_name,s3_key_extension,file_delimiter,tablename,dynamodb_table,row_number)
#if no error from dynamodb
# means loading completes
loading_status=response['loading_status']
print("loading_status is {}".format(loading_status))
if loading_status==1:
#means loading succeeded
print('loading succeeded!')
##### move the file into another s3 bucket for backup ######
stage=3
backup_file(s3_bucket,s3_key,file_name)
#=================================
delete_message(message)
#=================================
exit()
else:
#loading not complete due to some reason
#lambda failed to consume the SQS message
messages_to_reprocess.append({"itemIdentifier": record['messageId']})
print('Adding failed file into reprogress response list')
else: # the file is not csv, txt
# or the file is a converted file being processed by another lambda
print ('the file of {} is not for Data ETL or the converted file is being processed by another lambda.'.format(s3_key))
delete_message(message)
exit()
except Exception as e:
if stage==0 :
print_content='stage={}, error when checking files in s3, description: {}'.format(stage,e)
if stage ==1:
print_content='stage={}, error when splitting large files, description: {}'.format(stage,e)
if stage ==2:
print_content='stage={}, error when loading from s3 to dynamodb, description: {}'.format(stage,e)
if stage ==3:
print_content='stage={}, error when backing up files, description: {}'.format(stage,e)
#means error not caused by loading,
#the file needs to be reprocessed
print(print_content)
messages_to_reprocess.append({"itemIdentifier": record['messageId']})
print('Adding failed file into reprogress response list')
subject=print_content
body=e
#notify(0,subject,body)
batch_failure_response["batchItemFailures"] = messages_to_reprocess
if len(messages_to_reprocess) ==0:
#means the list is empty
print('message consumed successfully.')
return batch_failure_response
else: #means there's message in the list
list_to_string=' '.join(map(str, messages_to_reprocess))
print('There''s error message :{}'.format(list_to_string))
print('returning failed file to sqs...')
return batch_failure_response
#==========================================================================
#==========================================================================
def import_table_dynamodb(s3_bucket,path_name,s3_key_extension,file_delimiter,tablename,dynamodb_table,row_number):
response = dynamodb_client.import_table(
S3BucketSource={
'S3Bucket': s3_bucket,
'S3KeyPrefix': path_name
},
InputFormat=s3_key_extension.upper(),
InputFormatOptions={
'Csv':{
'Delimiter':file_delimiter
}
},
InputCompressionType='NONE',
TableCreationParameters= dynamodb_table
)
print('----')
print('Importing data from "{}" to dynamodb table "{}"'.format(path_name,tablename))
print(response)
#keep checking import status
attempts=0
MAX_WAIT_CYCLES=55
# lambda timeout should be more than (time.sleep) * MAX_WAIT_CYCLES
# 10 seconds * 55 cycles= 550seconds
# lambda can wait up to 550s
# but if there is result before 550s
# lambda will break the loop and run the next line
# MAX_WAIT_CYCLES is based on WCUs defined at table creation as well as data quantity
loading_status=0
while attempts < MAX_WAIT_CYCLES:
attempts += 1
time.sleep(10)
try:
response=dynamodb_client.describe_table(
TableName=tablename
)
status=response['Table']['TableStatus']
#'CREATING'|'UPDATING'|'DELETING'|'ACTIVE'|'INACCESSIBLE_ENCRYPTION_CREDENTIALS'|'ARCHIVING'|'ARCHIVED',
print("table status is {}".format(status))
if status=='ACTIVE':
loading_status=1
break
except dynamodb_client.exceptions.ResourceNotFoundException:
pass
print(loading_status)
return {
"loading_status":loading_status
}
#==========================================================================
def convert_file_with_index(source_bucket_name,source_s3_key,path_name,file_name,file_delimiter,source_s3_key_withoutextension,target_bucket_name=backup_bucket,target_foldername=backup_folder,target_subfoldername=today):
if 'price' in source_s3_key_withoutextension or 'index' in source_s3_key_withoutextension:
# source_s3_key is the original key name without any conversion
# source_s3_key_withoutextension is in lower case
# only files that needs index column can be added index column
# like table of 'priceindex', pk consists of 3 attributes
# to add index column as pk in dynamodb table
resp = s3_client.get_object(Bucket=source_bucket_name, Key=source_s3_key)
data = pandas.read_csv(resp['Body'],sep=file_delimiter)
data.index=data.index+1
# to make index starting from 1
data.index.name = 'AutoID'
csv_buffer=StringIO()
data.to_csv(csv_buffer)
# 1 to save the converted file into backup folder in backup bucket
# 2 to import data in converted file to dynamodb
respon=finding_folder(target_bucket_name,target_foldername,target_subfoldername)
# respon=target_folder
# that is the folder, the converted file will be saved.
save_as='{}converted_{}'.format(respon,file_name)
s3_session_resource.Object(target_bucket_name,save_as).put(Body=csv_buffer.getvalue())
csv_buffer.truncate(0)
# to backup the original file after loading succeeded
# can't delete the original file as we can't be sure
# the following loading process successful or not
converted_file=save_as
converted_pathname=respon[0:-1]
# deleting the trailing '/'
target_bucket=target_bucket_name
else:
# at the end, all files, converted or not, will be assigned a new parameter as 'converted file'
converted_file=source_s3_key
converted_pathname=path_name
target_bucket=source_bucket_name
return {
"converted_file":converted_file,
"converted_pathname":converted_pathname,
"target_bucket":target_bucket
}
#==========================================================================
def split_file(source_bucket_name, source_s3_key,file_delimiter,file_size_in_mib,target_bucket_name=backup_bucket,target_foler=backup_folder):
resp = s3_client.get_object(Bucket=source_bucket_name, Key=source_s3_key)
data = pandas.read_csv(resp['Body'],sep=file_delimiter)
#data_for_loading =data[['Date','GEO','DGUID','Products','UOM','UOM_ID','SCALAR_FACTOR','SCALAR_ID','VECTOR','COORDINATE','VALUE','STATUS','SYMBOL','TERMINATED','DECIMALS']]
data.index=data.index+1
row_number=len(data.index)
# to make index starting from 1
file_name=os.path.basename(source_s3_key)
length=len(file_name)
s3_key_extension=file_name[length-3:]
# below is to split the large data files into smaller one
# Please remember there is no way to save any files in the upload s3 bucket
# as s3 bucket will trigger sqs that will trigger lambda
# this is an horible infinite loop
# the split files are all saved in another backup bucket
small_files_folder=finding_folder(target_bucket_name,target_foler,'{}/{}'.format(today,file_name))
csv_buffer=StringIO()
lines_per_file=int(row_number*(5/file_size_in_mib))
ranges_number=math.ceil(row_number/lines_per_file)
print('total {} small files'.format(ranges_number) )
if row_number % lines_per_file > 0:
# means the last range is not the same large as the previous ranges
# which is normal
last_range_number=row_number % lines_per_file
# get the correct row number for the last range
else:
last_range_number=0
print ('the last range has {} rows'.format(last_range_number) )
def range1(start, end):
return range(start,end+1)
for splitnumber in range1(1,ranges_number):
print (splitnumber)
small_files_name=file_name.replace('.{}'.format(s3_key_extension),'_{}.{}'.format(s3_key_extension))
small_file_data=data.loc[((splitnumber-1)*lines_per_file+1):(splitnumber*lines_per_file)]
small_file_data.to_csv(csv_buffer,index=False)
# to save the data into backup folder
save_as='{}{}'.format(small_files_folder,small_files_name)
s3_session_resource.Object(target_bucket_name,save_as).put(Body=csv_buffer.getvalue())
csv_buffer.truncate(0)
# remember to truncate the buffer, as the buffer is still in loop
# it can't truncate itself for the next split file data
return {
"small_files_folder":small_files_folder
}
#==========================================================================
def finding_folder(bucket_name,folder_name,subfolder_name):
folder_combination='{}/{}/'.format(folder_name,subfolder_name)
folder_found=0
#folder_found=0 means neither folder nor subfolder found
#folder_found=1 means only folder found, need to create a subfolder
#folder_found=2 means both the folder and subfolder found
target_folder=None
#no path for target folder and subfolder yet
objects_in_bucket=s3_client.list_objects_v2(Bucket=bucket_name)
#as the bucket is empty at the first place, there wont be any content
if 'Contents' in objects_in_bucket:
for object in objects_in_bucket['Contents']:
if folder_name in object['Key']:
#folder found
substring=object['Key']
if substring.endswith('{}/'.format(folder_name)):
folder_alone=object['Key']
folder_path='{}{}/'.format(folder_alone,subfolder_name)
folder_found=1
#here to grab the backup folder without its subfolders
elif substring.endswith('{}/'.format(subfolder_name)):
print('subfolder found:'.format(object['Key']))
folder_found=2
target_folder=object['Key']
return target_folder
else:
folder_found=0
if folder_found==1:
#means folder found alone, missing today's subfolder
s3_client.put_object(Bucket=bucket_name, Key=folder_path)
print("today's subfolder created:{}".format(folder_path))
target_folder=folder_path
if folder_found==0:
s3_client.put_object(Bucket=bucket_name, Key=folder_combination)
print('Both of folder and subfolder created.'.format(folder_combination))
target_folder=folder_combination
return target_folder
# the result contains '/' in the end
#==========================================================================
def get_tablename(target_name,stamp=today):
correct_tablename=''
#we can add spelling corrector here if there's possibility of typos in filenames
if 'price' in target_name or 'index' in target_name:
#always be aware of lower cases
#make names to be in lower cases as soon as possible
#s3_key_withoutextension already be lower case
table_name='priceindex_{}'.format(stamp)
existing_tables=dynamodb_client.list_tables()['TableNames']
#print(existing_tables)
if table_name in existing_tables:
print("table of {} already exists".format(table_name))
response=dynamodb_client.delete_table(
TableName=table_name
)
# testing if the table is deleted or not
attempts =0
MAX_WAIT_CYCLES=10
table_deletion=0
while attempts < MAX_WAIT_CYCLES:
attempts += 1
time.sleep(3)
existing_tables=dynamodb_client.list_tables()['TableNames']
if table_name not in existing_tables:
print("table is deleted successfully")
# means table has been deleted
table_deletion=1
break
if table_deletion==0:
#measn table can't be deleted
print("Error when trying to delete it. Can't continue to import data.")
return {
"correct_tablename":"error"
}
correct_tablename=table_name
print(correct_tablename)
#elif 'some_string' in target_name:
#more schemas and tables here
else:
#print ('The file name of {} is not valid. Please check if you uploaded the right file.'.format(s3_key))
#print('Sending email of notification via SNS....')
subject='The file name of {} is not valid. Please check if you uploaded the right file.'.format(target_name)
body=''
res = notify(0, subject, body)
exit()
# if no table matches, this file won't be loaded
return {
"correct_tablename":correct_tablename
}
#===============================================================================
def notify(status, subject, body):
#here status decides which topic to call:
if status==1:
#means everythin going well
sns_topic_arn=topic_arn_on_success
elif status==0:
sns_topic_arn=topic_arn_on_failure
#print("topic_arn is : {}".format(sns_topic_arn))
subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99]
body = str(body)
sns_client = boto3.client('sns')
response = sns_client.publish(
TargetArn=sns_topic_arn,
Subject=subject,
Message=body
#if we wish to send different messages to mobile and email
# we can set as below:
#Message = json.dumps({'default': subject}),
#MessageStructure='json'
)
return "message sent"
#==========================================================================
def backup_file(source_bucket_name,source_s3_key,backup_filename,backup_bucketname=backup_bucket,backup_foldername=backup_folder):
#define the source
print('starting back-up.')
copy_source={
'Bucket':source_bucket_name,
'Key':source_s3_key
}
#define the target
#to find the backup folder and today's subfolder,
#if not existing, create new ones
respon=finding_folder(backup_bucketname,backup_foldername,today)
#target_folder=respon
#that is the folder, the data file will be moved into.
#to create target file
backup_file='{}{}'.format(respon,backup_filename)
s3_client.put_object(Bucket=backup_bucketname, Key=backup_file)
#to move the data file to the target position for back up
bucket=s3_resource.Bucket(backup_bucketname)
bucket.copy(copy_source,backup_file)
s3_resource.Object(source_bucket_name,source_s3_key).delete()
print('file of {} has been moved to backup folder of {}'.format(source_s3_key,respon))
#=====================================================================================
#to delete sqs message after a successful loading
def delete_message(message):
#print("message for deletion: {}".format(message))
sqs_region, sqs_Account_Id, sqs_name = message['eventSourceARN'].split(':')[3:6]
sqs_client = boto3.client('sqs', region_name=sqs_region)
sqs_url = f'https://{sqs_region}.queue.amazonaws.com/{sqs_Account_Id}/{sqs_name}'
#sqs_client.delete_message(QueueUrl=sqs_url, ReceiptHandle=message['receiptHandle'])
#print('sqs message deleted')
#print("QueueUrl is {} , and ReceiptHandle is {}".format(sqs_url,message['receiptHandle']))
#==========================================================================