AWS Cloud Data Engineering End-to-End Project — Trigger Amazon Lambda with S3 and Upload Data to RDS

Dogukan Ulu
11 min readAug 4, 2023

--

Tech Stack

  • Amazon EC2
  • Amazon S3
  • Amazon Lambda
  • Amazon RDS
  • Amazon ECR
  • Docker
  • Python
  • Shell Scripting
  • DBeaver

Overview

We are going to create a data pipeline in this article. We are going to upload a remote CSV file into the S3 bucket automatically with Python and Shell scripts running in an EC2 instance. Then, we are going to create a Lambda function using a container image. Once the CSV file is uploaded to the S3 bucket, the Lambda function will be triggered. The main target of the Lambda function will be modifying the CSV data and fixing the errors. We are going to create an RDS database as well and will upload the modified data into a table inside this database. Once the upload is completed, we will be able to monitor the data using DBeaver and we will connect to RDS using an SSH tunnel via EC2 user.

We will be doing all the steps inside a private VPC so that our data is protected. To conclude, this project will help us to transfer a remote CSV file to the S3 bucket, and in the end, we will be able to see the data inside the private RDS database. So let’s start!

IAM Role

The first thing is creating a suitable IAM role for the Lambda function. The Lambda function will need access to S3, RDS, and CloudWatch. It will also need Glue and EC2 permissions in the background. We should first create a new policy and name it s3-lambda-rds-full-policy. After creating the policy, we should create a new role as s3-lambda-rds-full-role and should choose the dedicated service as Lambda. We can use the below JSON as our policy.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::csv-to-s3-project-dogukan-ulu/*",
"arn:aws:s3:::csv-to-s3-project-dogukan-ulu"
]
},
{
"Effect": "Allow",
"Action": [
"glue:*"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"rds:*"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeInstances",
"ec2:AttachNetworkInterface"
],
"Resource": "*"
}
]
}

This policy will handle all the necessary permissions. We should also create a new role ec2-s3-full-access with AmazonS3FullAccess since we are going to upload our CSV file into the S3 bucket from the inside of the EC2 instance and we can choose the service as EC2.

We have our necessary roles ready for EC2 and Lambda. We won’t need any other IAM roles for any other service.

Amazon VPC and Security Groups

Since we want all the services to connect to each other and be safe in the cloud, we should create a dedicated VPC to be used for all the services. RDS, EC2, and Lambda will be located in a private subnet, and S3 will access the VPC via an endpoint that we will create. We are also going to create dedicated security groups for EC2, RDS, and Lambda so that we will allow traffics coming from EC2 and Lambda to RDS as well as the SSH connection for RDS.

  • Private subnet -> We want to keep our database and other services as safe as we can if no external connection is necessary. That’s why we will keep Lambda, EC2, and RDS inside the private subnet
  • Endpoint -> The best way of accessing the VPC from an S3 bucket is creating an endpoint (S3 gateway). That will allow Lambda to be able to retrieve data from the S3 bucket.

You may see the below article for all the details about how to “Establishing a VPC for Amazon S3, Lambda, RDS and EC2”.

EC2 Instance

We should create a Key Pair before creating the EC2 instance and install .pem file onto our local machine.

We already have our VPC and dedicated security group ready for the EC2 instance. Once the key pair is created, we can now create the EC2 instance. We can click on Launch instance and set the configurations as below:

  • Application and OS Images → Amazon Linux (Free tier eligible)
  • Instance type → t2.micro (free tier eligible)
  • Key pair → We can choose the key pair we created and install .pem file to our local machine
  • Network settings → This is the most crucial part. We are going to select the VPC we recently created as well as the related security group. Our subnet should be located in the same AZ as the RDS database just in case. You may find more details in the above article.
  • We should also attach the previously created IAM role ec2-s3-full-access to our instance.
  • We can leave other fields as default and launch the instance.

After clicking on Create instance, we have our EC2 instance up and running now. It will be needed both for uploading the file into the S3 bucket and accessing RDS for monitoring.

Amazon S3

We should create a dedicated S3 bucket. We are going to upload our CSV file into this bucket. It will also be used to trigger the Lambda function. We should choose a unique name according to the naming rules. I will use csv-to-s3-project-dogukan-ulu for this article.

Amazon RDS MySQL

The main thing while creating the RDS database is choosing the VPC and security group accordingly.

All the configuration needs are listed below:

  • DB Instance Identifier → It is just an identifier so that we can detect that this is the desired server. Not database name, etc. s3-lambda-rds-ec2-database (example)
  • Choose a database creation method → Standard create
  • Engine options → MySQL
  • Engine Version → MySQL 8.0.32
  • Templates → Free tier
  • Master username → <user_name>
  • Master password → <password>
  • DB instance class → db.t3.micro
  • Storage → Default settings
  • Connectivity → We don’t have to connect to an EC2 compute resource. We should definitely not allow public access to our database, it should be private. We should choose the VPC and security group accordingly. All the steps are explained in the above-mentioned article.
  • Database authentication → Password authentication
  • Additional configuration -> initial database name: rds_lambda_database (example)

It will take a bit to create the database. Once it is created, we should get the information of the database username, database name, database password, and database endpoint. We already defined the database password, database name, and database username. But we should also get the endpoint since we will need it while configuring the environmental variables.

Lambda Function

We are going to create a Lambda function using a container image. We have two main options as deployment packages while creating the Lambda function: zip file and Docker image. Using a Docker image is a better option since the zip file has a size limit and we might face some dependency issues on our local machine. You can see all the details about “How to Create Amazon Lambda Function with the Container Image (Dockerfile)” in the below article.

Since the creation process is explained above, we will only go through Dockerfile, requirements, and Python script. We are going to use dirty_store_transactions.csv as our main data source. How to upload the remote CSV file will be explained in the next sections. We will simply retrieve the data from the S3 bucket, modify it and upload to the RDS MySQL database. You can see all the files in the repo mentioned above.

You may see the Dockerfile below:

# python3.9 lambda base image. 
# Python version can be changed depending on your own version
FROM public.ecr.aws/lambda/python:3.9

# copy requirements.txt to container root directory
COPY requirements.txt ./

# installing dependencies from the requirements under the root directory
RUN pip3 install -r ./requirements.txt

# Copy function code to container
COPY lambda_function.py ./

# setting the CMD to your handler file_name.function_name
CMD [ "lambda_function.lambda_handler" ]

The requirements.txt file is as below:

pandas
boto3
pyarrow
sqlalchemy
pymysql

When it comes to the Python script, we should first define the environmental variables (we will define them later while configuring the Lambda function), S3 client, and database connection string.

database_name = os.getenv('database_name')
database_username = os.getenv('database_username')
database_password = os.getenv('database_password')
database_endpoint = os.getenv('database_endpoint')
database_port = 3306
s3_client = boto3.client('s3')
database_uri = f"mysql+pymysql://{database_username}:{database_password}@{database_endpoint}:{database_port}/{database_name}"

Then, we are going to create a class that includes the necessary methods to be used for data modification.

class ModifyColumns:
@staticmethod
def extract_city_name(string):
cleaned_string = re.sub(r'[^\w\s]', '', string)
city_name = cleaned_string.strip()
return city_name

@staticmethod
def extract_only_numbers(string):
numbers = re.findall(r'\d+', string)
return ''.join(numbers)

@staticmethod
def extract_floats_without_sign(string):
string_without_dollar = string.replace('$', '')
return float(string_without_dollar)

The first step is retrieving the data from the S3 bucket. We will define a specific bucket and key, and retrieve the data located only at that location.

def load_df_from_s3(bucket_name, key):
"""
Read a CSV from a S3 bucket & load into pandas dataframe
"""
s3 = GlobalVariables.s3_client
logger.info("Starting S3 object retrieval process...")
try:
get_response = s3.get_object(Bucket=bucket_name, Key=key)
logger.info("Object retrieved from S3 bucket successfully")
except ClientError as e:
logger.error(f"S3 object cannot be retrieved: {e}")

file_content = get_response['Body'].read()
df = pd.read_csv(io.BytesIO(file_content)) # necessary transformation from S3 to pandas

return df

Using a logger will help us see all the logs better visually. The second part is modifying the data.

def data_cleaner(df):
"""
Clean the data and return the cleaned dataframe
"""
df['STORE_LOCATION'] = df['STORE_LOCATION'].map(ModifyColumns.extract_city_name)
df['PRODUCT_ID'] = df['PRODUCT_ID'].map(ModifyColumns.extract_only_numbers)

for to_clean in ['MRP', 'CP', 'DISCOUNT', 'SP']:
df[to_clean] = df[to_clean].map(ModifyColumns.extract_only_numbers)

return df

After cleaning the data, we now have the main cleaned pandas data frame and this is the data that we will upload to RDS. We have to connect to the database first, create the table, upload the data into that table, and see the results from the database.

def upload_dataframe_into_rds(df):
"""
Connect to RDS, upload the dataframe into the database
"""
table_name = 'clean_transaction'
sql_query = f"SELECT * FROM {table_name}"
database_uri = GlobalVariables.database_uri
logger.info("Starting the RDS Connection process...")
try:
engine = create_engine(database_uri)
logger.info('RDS Database connection successful')
except Exception as e:
logger.error(f'RDS Database connection unsuccessful: {e}')
raise

logger.info("Starting to upload the dataframe into RDS database")
try:
df.to_sql(table_name, con=engine, if_exists='append', index=False)
logger.info(f'Dataframe uploaded into {GlobalVariables.database_name}.{table_name} successfully')

uploaded_df = pd.read_sql(sql_query, engine)
logger.info('\n' + uploaded_df.head(5).to_string(index=False))
except Exception as e:
logger.error(f'Error happened while uploading dataframe into database: {e}')
raise

We have to combine all the methods under the method lambda_handler since we designed our Dockerfile as so.

def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']

logger.info(f"S3 bucket is obtained from the event: {bucket}")
logger.info(f"Object key is obtained from the event: {key}")

df = load_df_from_s3(bucket_name=bucket, key=key)
df_final = data_cleaner(df)
upload_dataframe_into_rds(df_final)

logger.info("Whole process completed.")

We are going to create our Lambda function using an S3 trigger. That’s why we used event in the main method. That will come to the function from that trigger. We are going to do all the processes for that event. You may see a sample event below.

{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "eu-central-1",
"eventTime": "2023-08-02T12:34:56.789Z",
"eventName": "ObjectCreated:Put",
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "example-event-config-id",
"bucket": {
"name": "csv-to-s3-project-dogukan-ulu",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::csv-to-s3-project-dogukan-ulu"
},
"object": {
"key": "dirty_store_transactions/dirty_store_transactions.csv",
"size": 1024,
"eTag": "example-object-etag",
"sequencer": "example-sequencer"
}
}
}
]
}

We have our Python script, Dockerfile, and requirements ready. We can now create the image from these using the above-mentioned article. Please keep in mind that we have to use the IAM role we created for the Lambda function.

After creating our function from a container image, we can follow the below steps to get our function configured for all the processes:

  • We should set the timeout as 2–3 minutes from the General Configuration tab.
  • We should define our environmental variables under Environment variables.
  • We should choose the VPC, security group, and subnet as the ones we created in the VPC section. You can see all the details about the VPCs in the article I mentioned in the VPC section.
  • The last part is creating an S3 trigger as below. You can choose the bucket according to your use case.

To conclude, we have created our Lambda function and configured it so that it can retrieve data from S3, and upload data into RDS with suitable parameters.

Upload Data into S3 Bucket

We have created our EC2 instance, RDS MySQL database, Lambda function, and S3 bucket so far. VPC, subnets, and security groups have been created and configured for all the services. IAM roles are also attached to Lambda and EC2. We have configured the Lambda function with an S3 trigger. We have EC2 and RDS MySQL database up and ready.

Here comes the main part to start the whole process. We should connect to the EC2 instance we created using SSH on our local machine. Once we connect, we can upload the remote CSV file into the S3 bucket we created using the below article.

Even though the above script has the ability to create the bucket as well, we should create the S3 bucket from the S3 console. We are going to see that it is owned by us in the logs of the script.

Once the data is uploaded, the Lambda function will be triggered. Data will be modified and uploaded into the RDS table clean_transaction.

Monitor the Data

Since we already uploaded data into the RDS, we can now monitor the data. We are going to first check the CloudWatch logs from the Monitor tab in the Lambda console.

If the whole process seems to be working, we are going to connect to DBeaver using the EC2 user and monitor the data itself in the DBeaver. You may see all the details in the below article

And finally, we can see that the table is created, and data has been cleaned and uploaded into the table correctly.

Hope it helps, thanks for reading :)

You may reach out via Linkedin and Github, all comments are appreciated 🕺

--

--