ETL using Cloudwatch Logs and AWS Lambda

Jay Jain
CodeX
Published in
6 min readMar 17, 2023
source: https://www.thelambdablog.com/filtering-cloudwatch-logs-by-loggroups-and-logstreams-and-reading-them-using-python-and-the-boto3-sdk/

Introduction

In today’s data-driven world, organisations generate vast amounts of information, and ETL pipelines are essential in transforming raw data into valuable insights. This article will explore how AWS Lambda and CloudWatch Logs can be leveraged to extract valuable insights. Additionally, we will explore the concept of partitioning and how to store data in S3 using the same. By examining code snippets, we can better understand the above objectives.

Overview of Problem Statement

As we all are apprehensive, one of the most used logging services in AWS is Amazon CloudWatch. It offers in-the-moment comprehensions into operations, structure, and operations. Yet, decoding CloudWatch logs can be time-consuming and laborious, particularly if you need to prize certain information from numerous log aqueducts. ETL( Extract, transform, load) results can automatically prize the necessary data, transfigure it into a usable format, and load it into a target store like Amazon S3 to speed up this process.

In this composition, we’ll look at how to automate the CloudWatch log ETL process using Amazon Lambda, CloudWatch Logs, and S3. We’ll produce a Lambda function for pulling logs from CloudWatch and storing them in S3 in Parquet format with a timestamp- grounded partitioning. This fashion can help enterprises save time and costs by automating the process of log operation and making data analysis more effective

Important Concepts in ETL

  • Partitioning Concepts: Partitioning is a data organisation technique that divides large datasets into smaller, more manageable parts. Partitioning makes it easier to manage and process large datasets.
  • Data Integration: ETL pipelines enable organisations to integrate data from multiple sources into a single database. This helps to create a unified view of the organisation’s data.
  • Data Quality: ETL pipelines can be used to clean and normalise data, ensuring that the data is accurate and consistent.
  • Business intelligence: ETL is a critical component of business intelligence (BI) systems, which allow businesses to make data-driven decisions based on insights derived from their data.
  • Data warehousing: ETL is a key process in data warehousing, which involves storing and managing large volumes of data for analysis and reporting purposes. ETL allows businesses to extract data from various sources and load it into a data warehouse for analysis.

AWS Services Used

  • AWS Cloudwatch: It is a logging service that enables you to monitor, store, and access log files across different AWS services such as EC2, Glue, etc.
  • AWS Lambda: It is a serverless compute service that runs your code in response to events and automatically manages the compute resources for you, making it ideal for running simple scripts and automating tasks like ETL.
  • AWS S3: AWS S3 is an object storage service provided by Amazon Web Services. It is designed to store and retrieve any amount of data from anywhere on the internet
  • AWS SES: Amazon SES is a cloud email service provider that helps us to send emails in the form of attachments as well

Steps for ETL Pipeline

Step 1: Set up CloudWatch Logs

The first step in building an ETL pipeline is to set up CloudWatch Logs. You need to create a log group and log stream to store the log data. You can use the AWS Management Console to create a new log group and log stream.

Step 2: Create an AWS Lambda function

The next step is to create an AWS Lambda function to process the log data. You can use the AWS Management Console to create a new Lambda function.

Step 3: Process the log data

The Lambda function needs to be designed in a way that allows it to process log data in near real-time. It should be able to extract the necessary data from log files, transform it into a structured format, and then load it into a target database. Additionally, the Lambda function can be scheduled to run at regular intervals, such as hourly, daily, or weekly, to ensure the timely processing of log data.

Step 4: Store the data in S3

After the data has been transformed into a usable format, it is stored in Parquet format in an S3 bucket. For this use case, the data is stored in a partitioned format, with each partition representing a specific Year-Month-Day-Hour combination. This partitioning approach is necessary as CloudWatch logs are near real-time and data needs to be available in an hourly format. However, the code can be modified to suit specific requirements

Code Snippet

import os
import boto3
import pandas as pd
from datetime import datetime, timedelta

s3_client = boto3.client('s3')
cw_client = boto3.client('logs')

def get_cloudwatch_logs(log_group_name, log_stream_name, start_time, end_time):
kwargs = {
'logGroupName': log_group_name,
'logStreamName': log_stream_name,
'startTime': int(start_time.timestamp() * 1000),
'endTime': int(end_time.timestamp() * 1000),
'startFromHead': True,
}
response = cw_client.get_log_events(**kwargs)
events = response['events']
while response.get('nextForwardToken'):
kwargs.update({'nextToken': response['nextForwardToken']})
response = cw_client.get_log_events(**kwargs)
events.extend(response['events'])
return events

def transform_logs_to_dataframe(events):
log_data = []
for event in events:
log_data.append(event['message'])
df = pd.DataFrame(log_data, columns=['message'])
return df

def dump_dataframe_to_s3(df, bucket_name, partition_path):
file_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
file_path = f"{partition_path}/{file_name}.parquet"
s3_resource = boto3.resource('s3')
s3_object = s3_resource.Object(bucket_name, file_path)
df.to_parquet(s3_object)
print(f"Data dumped to s3://{bucket_name}/{file_path}")

def lambda_handler(event, context):
log_group_name = "your-log-group-name"
log_stream_name = "your-log-stream-name"
bucket_name = "your-bucket-name"
partition_path = datetime.now().strftime("%Y/%m/%d/%H")
start_time = datetime.utcnow() - timedelta(hours=1)
end_time = datetime.utcnow()
events = get_cloudwatch_logs(log_group_name, log_stream_name, start_time, end_time)
df = transform_logs_to_dataframe(events)
dump_dataframe_to_s3(df, bucket_name, partition_path)

Explanation

The above code is using Boto3, pandas and the DateTime library. This code saves the data in parquet format rather than CSV, which is a more effective and optimised file format for large-scale data processing. Additionally, it partitions the data by year, month, day, and hour to make it easier to query and dissect the data in the future

1. As seen in the code we are passing variables to fetch logs details using the boto3 library by Kwargs in the “get_cloudwatch_logs” function
2. In the “transform_logs_to_dataframe” function, one can write their transformation logic if any
3. In the “dump_dataframe_to_s3” function, logs are getting loaded in s3 in parquet format, can also load that into any database or redshift, and change the code accordingly
4. Lambda_handler is the main function that starts at the first at the time of execution of lambda

Additional Steps:

  1. Include error-handling scenario by addition of try-except block.
  2. Include logging mechanism
  3. Utilise this enriched file for analysis either by running a crawler in Athena or loading this file into redshift and eventually into any visualisation tool
  4. One can even include SNS Notification on successful completion of Lambda execution or can use SES and python MIME library for email
import boto3
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart

def send_mail_with_attachment(ses_client: boto3.client,
to_address: List[str],
from_address: str,
subject: str,
file_name: str,
file_path: str,
email_body: str,
email_body_type='html') -> bool:


message_part = MIMEMultipart()
message_part['Subject'] = subject
message_part['From'] = from_address
message_part['To'] = ', '.join(to_address)
# message body
mime_part = MIMEText(email_body, email_body_type)
message_part.attach(mime_part)
mime_part = MIMEApplication(open(file_path, 'rb').read())
mime_part.add_header('Content-Disposition', 'attachment', filename=file_name)
message_part.attach(mime_part)
response = ses_client.send_raw_email(
Source=message_part['From'],
Destinations=to_address,
RawMessage={
'Data': message_part.as_string()
}
)

With the help of above code, one can share output file as attachment in the form of email to their users

Conclusion

I hope through this article; you were able to get an understanding of how AWS Lambda can be utilised to automate the process of extracting logs from CloudWatch and transforming them into a structured format, followed by a brief introduction to the partitioning concept in S3 and sending an email notification to users

Key Take Aways

  1. ETL pipelines are essential for managing and processing large amounts of data, and AWS Lambda can be used to automate the ETL process for CloudWatch logs.
  2. Partitioning can improve query performance and reduce costs by dividing large datasets into smaller, more manageable parts.
  3. Storing data in parquet format with partitioning based on the timestamp can make it easily accessible and queryable while also allowing for efficient data analysis.

If you’ve liked this article please follow me for more such interesting articles and do share, clap and leave comment for any feedback

--

--

Jay Jain
CodeX
Writer for

Senior Data Engineer at Exponentia AI | AWS Certified Solution Architect | BI Tool | ETL | Spark | AWS Glue | Data Warehouse | Big Data