3. AWS Lambda & ECS (Elastic Container service) in Data Processing with Python (Part 2 — Docker Container)

Julia R
11 min readJul 29, 2023

--

This article is about data processing using AWS ECS (Elastic Container service) and Containerized Lambda. It is part of the project where all is automated. From the very beginning of data collection until the data analytics publishing on the website, there is no manual process.

More can be found in Github.

Requirements:

DockerHub or AWS ECR (Elastic Container Registry)

  1. ECS (Amazon Elastic Container Service) in Fargate Type

Lambda in the container will be share in next section in this article.

As previously discussed in article (2. Data ETL Part1), ECS is responsible for Database Init and Data ETL for large dataset. One of the reasons I can’t use non-containerized Lambda for all computation in this project is that AWS RDS for MySQL is not allowed to execute multiple lines at one time using Lambda Function. After we build MySQL using Terraform (the codes will be shared later in another article on Terraform), the database is kind of ‘empty’. We need to run stored procedures to make it ready for Data ETL in the next step. As Stored Procedures used for Database Init are nested queries, Lambda Function in a non-container type is unable to do any modification to the database.

One of the few solutions is to run MySQL Command Line to execute a .sql file at one move. ECS or Containerized Lambda come to rescue as the container allows to run Command Line. We can decide to apply ECS or Containerized Lambda according to cost effectiveness. I applied both of them for 2 different tasks in the project just for practice. ECS for Database Init, and Containerized Lambda for Data Transfer.

Preparation: Docker + MySQL Client + Python

a) Docker File

Base Image: python:3.10-slim-buster

I tried to use other base image like ubuntu and found python image is the easiest way to install MySQL Client for this project.

b) MySQL Client

To make the docker image as slim as possible, the container won’t install a complete MySQL. Only MySQL Client is installed and ECS connects to external MySQL database in the same VPC in the cloud with MySQL Command Line.

c) Python

Python will be used as working language for ECS to finish tasks.

=========================>Docker File

FROM python:3.10-slim-buster

COPY /mysql/requirements.txt requirements.txt
# Install required library libmysqlclient (and build-essential for building mysqlclient python extension)
WORKDIR /
RUN set -eux && \
export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install -y default-libmysqlclient-dev build-essential && \
apt-get install -y default-mysql-client

RUN pip install --upgrade pip
RUN pip install -r requirements.txt
RUN rm -rf /var/lib/apt/lists/*
# if you wish to install mysql client only
# like in this project to connect external RDS for mysql only
# and if you use python as base image
# libmysqlclient and mysqlclient need to be installed together with apt-get.
# if you install libmysqlclient first by 'apt-get'
# and then install mysqlclient by 'pip install',
# container won't find path of mysql
COPY . .
# the ecs task will be triggered by lambda in this project,
# below command will be passed in by lambda
# so i commented it out
#CMD [ "python", "/rds_init.py" ]

========================> requirement.txt

pymysql==1.0.3
cryptography
boto3

=========================> rds_init.py


import os
import subprocess
import tempfile
import time
import boto3
import botocore
s3_client=boto3.client('s3')
s3_resource = boto3.resource('s3')

import pymysql
import json

task=os.environ['task']
rds_endpoint = os.environ['rds_endpoint']
secret_string=os.environ['secret_string']
secret_string_db_maintain=os.environ['secret_string_db_maintain']
aws_region=os.environ['aws_region']
mysql_database=os.environ['mysql_database']

secret_dict=eval(secret_string)
secret_dict_db_maintain=eval(secret_string_db_maintain)
username = secret_dict['username']
username_db_maintain = secret_dict_db_maintain['username']
password = secret_dict['password']
password_db_maintain = secret_dict_db_maintain['password']
mysql_host_name=rds_endpoint[0:-5]

file_name=os.environ['file_name']
s3_bucket=os.environ['s3_bucket']
s3_key=os.environ['s3_key']
s3_key_withoutextension=os.environ['s3_key_withoutextension']

def run_sql(conn, sql_query,result=False):
with conn.cursor() as cur:
#print("Executing: {}".format(sql_query))
cur.execute(sql_query)
conn.commit
if result==True:
rows = cur.fetchall()
json_result = json.dumps(rows)
return json_result
else:
return

if task=='rds_init':
print('the task is rds_init')
try:
stage=0
conn = pymysql.connect(host=mysql_host_name, user=username, password=password, db=mysql_database, connect_timeout=10)
print('connection to {} is successful'.format(mysql_host_name))

# search for user
sql_query=' SELECT user FROM mysql.user;'
json_users=run_sql(conn,sql_query,True)
print("users are {}".format(json_users))
# if user exists
if username_db_maintain in json_users:
# drop user
sql_query="drop user {}@'%'".format(username_db_maintain)
run_sql(conn,sql_query)

# create a new temp user with mysql_native_password
sql_query="CREATE USER IF NOT EXISTS '{}'@'%' IDENTIFIED WITH mysql_native_password BY '{}';".format(username_db_maintain,password_db_maintain)
run_sql(conn,sql_query)
# grant least priviledge to user according to the task content
sql_query="Grant SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, ALTER, CREATE TEMPORARY TABLES,EXECUTE,CREATE ROUTINE, ALTER ROUTINE ON {}.* TO '{}'@'%' WITH GRANT OPTION;".format(mysql_database,username_db_maintain)
run_sql(conn,sql_query)
# save the setting
sql_query="FLUSH PRIVILEGES;"
run_sql(conn,sql_query)

# use new user to login mysql
conn = pymysql.connect(host=mysql_host_name, user=username_db_maintain, password=password_db_maintain, db=mysql_database, connect_timeout=10)
print('new connection to {} is successful'.format(mysql_host_name))
stage=1
# download s3 file to locally
s3_resource.Bucket(s3_bucket).download_file(s3_key, file_name)
#print(open(file_name).read())
file= open(file_name, 'r')
# read all content of a file
lines=file.readlines()
# count how many stored procedures are included in the file
# use the number to check if all procedures are actually saved in the db later
count=0
for line in lines:
if 'DELIMITER &&' in line:
count=count+1
procedures_num=count
print('file_name is {}, including {} procedures'.format(file_name,procedures_num))
stage=2
# execute the whole file using MySQL command line
# first to solve the problem 'mysql not found'
# find mysql
find_sql='which mysql'
process1=subprocess.run(find_sql, stdout=subprocess.PIPE, shell=True )
result=process1.stdout.decode('utf-8')
print("MySQL installed in {}".format(result))
mysql_path=result[0:-7]
print('mysql_path is {}'.format(mysql_path))
# set MySQL bin folder to PATH
set_path="export PATH={}:$PATH".format(result)
process2=subprocess.Popen(set_path,shell=True)

# finally execute .sql file
mysql_command='mysql -h {} -u {} -p{} -D {} < /{}'.format(mysql_host_name,username_db_maintain,password_db_maintain,mysql_database,file_name)
print('init_command is {}'.format(mysql_command))
process=subprocess.Popen(mysql_command,shell=True)

stage=3
# test for db
sql_query=' SELECT count(ROUTINE_NAME) FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_TYPE="PROCEDURE" AND ROUTINE_SCHEMA="{}" and ROUTINE_NAME LIKE "sp_{}%";'.format(mysql_database,s3_key_withoutextension)

# the above mysql command line may take time to complete
# use a loop to find the final result of executing command line
attempts=0
MAX_WAIT_CYCLES=20
print("Executing {}".format(sql_query))
record=0
while attempts < MAX_WAIT_CYCLES:
attempts += 1
time.sleep(3)
with conn.cursor() as cur:
cur.execute(sql_query)
record=cur.fetchone()[0]
if record==procedures_num :
break
conn.commit
if record==procedures_num:
# means all procedures in the file are executed successfully
print('Result after checking is {}'.format(record))
print('Database Init completed. {}/{}'.format(s3_bucket,s3_key))
else:
print('Result after checking is {}, supposed to be {}'.format(record,procedures_num))
print('Database Init failed. {}/{}'.format(s3_bucket,s3_key))
except botocore.exceptions.ClientError as e:
if stage==1 and e.response['Error']['Code'] == "404":
print("The object does not exist. {}".format(e))
exit()
else:
print_content="error occurred during stage {} : {}".format(stage,e)
print(print_content)
exit()

elif task =='web_request':
print('the task is web_request')
# to continue with Data ETL tasks
# the scripts have similar logics as Lambda Function in the previous article
# here, ECS on Database Init is mainly discussed.

Note:

AWS RDS for MySQL (Not Aurora MySQL) have below limitations for data loading:

1) No Data API

We can’t execute SQL query out of VPC.

2) No execute_many method

We can’t execute multiple lines at one time.

It is possible to execute line by line for “SELECT …, INSERT…” statements using ‘cursor.execute()’.

But we can’t create procedures for multiple lines at one time.

For MySQL DB Init, the solution is to use lambda outside of VPC to trigger/run ECS task using images built in ECR. DockerHub is fine if ECS can connect to the internet and pull image from the repository. As this project rejects internet connection for ECS for security, ECR is the only option. You may ask how ECS connects to ECR if ECS can’t connect to the internet? The answer can be NAT Gateway or VPC Endpoints. This will be shared in the article on Cloud Security.

3) authentication method

The latest versions of mysql (8.0) use caching_sha2_password. Other versions use mysql_native_password.

The client in this project does not support caching_sha_password. The solution is to install AWS RDS for MySQL 5.7 instead of 8.0.

4) Access Denied by User@’ip_address’

The user we created when building MySQL in AWS, it is a master user. It has such privileges:

https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.MasterAccounts.html

It is recommended to create a new user dedicated for detailed task with the least privilege. Then both ECS and Lambda in this project can use this user to connect MySQL for their job.

The user must be created @’%’, instead of @’endpoint’ or @’localhost’. The privileges granted to this user should be restricted according to its task.

CREATE USER IF NOT EXISTS ‘username’@’%’ IDENTIFIED WITH mysql_native_password BY ‘password’;

GRANT SELECT,INSERT(all permissions for daily job) ON `specificDB`.* TO ‘username’@’%’ WITH GRANT OPTION;

Image Building: Docker

After we finish the above 3 files, it’s time to create a Container Image in AWS ECR.

1) to open terminal on MAC (or other OS) in your laptop

2) to create a new repo in ECR using below command (Please remember to replace ‘here_is_the_name’ and ‘ca-central-1’ with your own repo name and AWS region) ‘$’ Sign is not included when copying the command

$ aws ecr create-repository — repository-name here_is_the_name — region ca-central-1

3) to add image-scanning when pushing to ECR (replace ‘here_is_the_name’ with your own repo name)

$ aws ecr put-image-scanning-configuration \

— repository-name here_is_the_name \

— image-scanning-configuration scanOnPush=true

4) to login the newly created repo (replace ‘ca-central-1’, ‘account_id’ with your own info)

$ aws ecr get-login-password — region ca-central-1 | docker login — username AWS — password-stdin account_id.dkr.ecr.ca-central-1.amazonaws.com

5) to enter work folder (where docker file, requirement.txt, .py file are) on your laptop

$ cd …/…/…/folder

6) to delete all images in older versions (optional)

$ docker rmi $(docker images -a -q)

7) to build docker image (replace ‘here_is_the_repo_name’ with your own repo name and add a space and then a dot in the end)

$ docker build -t here_is_the_repo_name . (there is a dot in the end)

8) to tag the image(replace ‘the_repo_name’, ‘account_id’, ‘ca-central-1’ with your own info)

$ docker tag the_repo_name:latest account_id.dkr.ecr.ca-central-1.amazonaws.com/the_repo_name

9) to push image to/from ECR(replace ‘the_repo_name’, ‘account_id’, ‘ca-central-1’ with your own info)

$ docker push account_id.dkr.ecr.ca-central-1.amazonaws.com/the_repo_name

10) to pull image to/from ECR(replace ‘the_repo_name’, ‘account_id’, ‘ca-central-1’ with your own info) (optional, image pulling will be set up when ECS container is defined)

$ docker pull account_id.dkr.ecr.ca-central-1.amazonaws.com/the_repo_name:latest

Pulling the image from repository

The image has a unique ID in the repo and this ID will be used later when we create ECS Container in Terraform.

2. Lambda in the container

The containerized Lambda in this project is the Child Lambda for reporting. It is responsible for exporting data from AWS RDS for MySQL to S3 and save it in the .json file. Compared to non-containerized Lambda, the setting for Lambda in the container is more complicated.

Preparation: Docker + MySQL Client + AWS CLI + Lambda + Python

a) Docker File

Base Image: python:3.10-slim-buster

b) MySQL Client and AWS CLI

MySQL Client (mysql) and AWS CLI are installed in the base image. They will work with Lambda to export data from MySQL database to AWS S3 Bucket using MySQL Command Line.

c) Python

Python will be used as working language for Containerized Lambda to finish tasks.

=========================>Docker File

ARG FUNCTION_DIR="/home/app/"

FROM python:3.10-slim-buster

ARG FUNCTION_DIR
RUN mkdir -p ${FUNCTION_DIR}
COPY *.py ${FUNCTION_DIR}

# Installing Lambda image dependencies
RUN apt-get update \
&& apt-get install -y \
g++ \
make \
cmake \
unzip \
libcurl4-openssl-dev
RUN python3 -m pip install awslambdaric

# Installing AWS CLI to upload to S3 bucket
RUN pip3 install \
awscli

# Install the function's dependencies using file requirements.txt
# from your project folder
COPY requirements.txt .
RUN pip3 install -r requirements.txt

# Installing mysqldump and cleaning apt cache
RUN apt update && apt install -y mariadb-client && \
apt-get clean autoclean && \
apt-get autoremove --yes && \
rm -rf /var/lib/{apt,dpkg,cache,log}/

WORKDIR ${FUNCTION_DIR}
ENTRYPOINT [ "/usr/local/bin/python3", "-m", "awslambdaric" ]
CMD [ "reporting.handler" ]

====================> requirement.txt

boto3

====================> reporting.py

import subprocess
import os
import time

import json
import boto3
import botocore
s3_client=boto3.client('s3')
s3_resource = boto3.resource('s3')

# get environment variables

aws_region=os.environ['aws_region']
mysql_database = os.environ['mysql_database']
mysql_host=os.environ['mysql_host']
mysql_host_name=mysql_host[0:-5]
backup_bucket=os.environ['backup_bucket']
timestamp = time.strftime('%Y-%m-%d-%I:%M')
print('current time is : {}'.format(timestamp))

topic_arn_on_failure=os.environ['topic_arn_on_failure']
topic_arn_on_success=os.environ['topic_arn_on_success']
topic_name_on_failure=os.environ['topic_name_on_failure']
topic_name_on_success=os.environ['topic_name_on_success']

# get username/password from secrets manager
secret_name=os.environ['secret_name']
session=boto3.session.Session()
session_client=session.client(
service_name='secretsmanager',
region_name=aws_region
)
secret_response=session_client.get_secret_value(
SecretId=secret_name
)
secret_arn=secret_response['ARN']
secretstring=secret_response['SecretString']
secret_json=json.loads(secretstring)
user_name=secret_json['username']
pass_word=secret_json['password']

def handler(event, context):
#get variables from parent lambda:
report_source_data=event['report_source_data']
report_source_data_folder=event['report_source_data_folder']
reporting_status=0
try:
# after the reporting lambda is invoked by loading or leader lambda
# reporting lambda will export data from rds mysql to s3 using CLI
# the reason to do so is that mysql can't export data to s3 directly

# Attention: If we use 'mysqldump' to backup/export data to s3,
# only metadata will be saved in s3. Please don't ask me why data in the table
# can't be saved. I don't know the reason. But i know the solution...

# below method can get data instead of metadata of MySQL table
save_as = "{}_{}.csv".format(report_source_data,timestamp)
command='mysql -h %s -u %s -p%s -D %s --batch --quick -e "select * from %s" > /tmp/%s' % (
mysql_host_name,user_name,pass_word,mysql_database,report_source_data,save_as
)
subprocess.Popen(command, shell=True).wait()
# upload the .csv file from /tmp/ to s3
command="aws s3 cp /tmp/{} s3://{}/{}/{}".format(save_as,backup_bucket,report_source_data_folder,save_as)
subprocess.Popen(command, shell=True).wait()
# clear the lambda tmp folder
command="rm /tmp/{}".format(save_as)
subprocess.Popen(command, shell=True).wait()

print("MySQL backup finished")

except Exception as e:
print_content='error when exporting data from MySQL to S3, description: {}'.format(e)
print(print_content)
reporting_status=0
return {
"reporting_status":reporting_status,
"error":print_content
}

print('Exporting from MySQL to S3 completed!')
reporting_status=1
return {
"reporting_status":reporting_status,
"error":''
}

Image Building: Docker

After we finish the above 3 files, it’s time to create a Container Image in AWS ECR.

1) to open terminal on MAC (or other OS) in your laptop

2) to create a new repo in ECR using below command (Please remember to replace ‘here_is_the_name’ and ‘ca-central-1’ with your own repo name and AWS region) ‘$’ Sign is not included when copying the command

$ aws ecr create-repository — repository-name here_is_the_name — region ca-central-1

3) to add image-scanning when pushing to ECR (replace ‘here_is_the_name’ with your own repo name)

$ aws ecr put-image-scanning-configuration \

— repository-name here_is_the_name \

— image-scanning-configuration scanOnPush=true

4) to login the newly created repo (replace ‘ca-central-1’, ‘account_id’ with your own info)

$ aws ecr get-login-password — region ca-central-1 | docker login — username AWS — password-stdin account_id.dkr.ecr.ca-central-1.amazonaws.com

5) to enter work folder (where docker file, requirement.txt, .py file are) on your laptop

$ cd …/…/…/folder

6) to delete all images in older versions (optional)

$ docker rmi $(docker images -a -q)

7) to build docker image (replace ‘here_is_the_repo_name’ with your own repo name and add a space and then a dot in the end)

$ docker build -t here_is_the_repo_name . (there is a dot in the end)

8) to tag the image(replace ‘the_repo_name’, ‘account_id’, ‘ca-central-1’ with your own info)

$ docker tag the_repo_name:latest account_id.dkr.ecr.ca-central-1.amazonaws.com/the_repo_name

9) to push image to/from ECR(replace ‘the_repo_name’, ‘account_id’, ‘ca-central-1’ with your own info)

$ docker push account_id.dkr.ecr.ca-central-1.amazonaws.com/the_repo_name

10) to pull image to/from ECR(replace ‘the_repo_name’, ‘account_id’, ‘ca-central-1’ with your own info) (optional, image pulling will be set up when ECS container is defined)

$ docker pull account_id.dkr.ecr.ca-central-1.amazonaws.com/the_repo_name:latest

Pulling the image from repository

The image has a unique ID in the repo and this ID will be used later when we create Containerized Lambda in Terraform.

--

--