Sitemap

AWS CloudWatch log ingestion to Microsoft Sentinel

5 min readMar 16, 2025

At Q-Solution, a specialist company providing managed services to UK Government, we use Microsoft Sentinel as our cloud-based Security and Information Event Management (SIEM). Our cloud estate primarily uses Amazon Web Services (AWS) where we use CloudWatch for infrastructure and application logs.

Having already used Microsoft Sentinel for AWS CloudTrail, VPC Flow Logs and Amazon GuardDuty, we wished to also stream CloudWatch logs into Sentinel so that we could develop custom alerts and incidents.

Why the provided integration wasn’t practical

However, on testing the integration between AWS CloudWatch and Sentinel available late 2023, it became apparent that this was based on a single CloudWatch log group, and a separate Azure Function would be needed for every CloudWatch log group.

As we currently have hundreds of CloudWatch log groups, this approach is unduly complex and simply not practical for our use, so I raised an issue in the open-source Azure Sentinel repository.

After some communication with Microsoft and a call, it became clear that they didn’t regard this as enough of a priority to include on their immediate roadmap, however they did provide some guidance as to how this could be achieved.

The solution

CloudWatch log ingestion to Microsoft Sentinel

We use CloudWatch log subscription filters to stream logs to S3 via a Kinesis Firehose data stream for each AWS account. This is a standard AWS architecture for CloudWatch log streaming to a central S3 bucket, as detailed in this article.

An example log from Firehose data stream is:

{ 
"messageType": "DATA_MESSAGE",
"owner": "012345678901",
"logGroup": "TestLogGroup",
"logStream": "TestLogStream",
"subscriptionFilters": [ "app-logs-to-s3-sentinel-dev" ],
"logEvents": [{
"id": "37793097787735394733383965267607845781987661504445874176",
"timestamp": 1694701116545,
"message": "{\"cat\":\"client\",\"outcome\":\"accepted\",\"client\":\"10.10.10.10\",\"gid\":\"sg-0c67f0fty50ehh6bn\",\"instance\":\"i-0115ccccf78956ddc\",\"timestamp\":\"2023-09-14T14:18:36.545938648Z\"}"
}]
}

The above log format produced by Kinesis Data Firehose delivery streams for CloudWatch Log Subscription filters is consistent and predictable, so ideal for mapping values into a standard Sentinel data table.

In our implementation, we use Kinesis Data Firehose delivery streams from CloudWatch Log Subscription Filters, with a S3 bucket destination for pre-processed CloudWatch data, which triggers a Lambda function to transform the data and copy to another S3 bucket integrated with Sentinel via the AWS S3 CloudWatch connector.

An alternative architecture might be to use a data transform Lambda within the Firehose delivery stream, however I haven’t tested that.

The Lambda transform function used in our implementation extracts the fields from the logEvents list to populate the CloudWatch data table.

Our Transform Lambda function

Our Transform Lambda code for CloudWatch logs streamed by Firehose is written in Python, and is partially based on the original Python code for the integration provided by Microsoft for a single CloudWatch log group.

import pandas as pd
import json
import gzip
import botocore
import boto3
import csv
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client("s3")

destination_bucket = os.environ["DESTINATION_BUCKET"]
path = "/tmp/logfile.gz"


def add_context_to_json_message(message, accountid, loggroup, logstream):
# convert message to JSON
message_json = json.loads(message)
# Add log context information to message JSON
message_json["logGroup"] = loggroup
message_json["logStream"] = logstream
message_json["accountId"] = accountid

return json.dumps(message_json)


def add_context_to_string_message(message, accountid, loggroup, logstream):
# include message string in new JSON object
message_json = {"data": message, "logGroup": loggroup, "logStream": logstream, "accountId": accountid}

return json.dumps(message_json)


def transform_log_event(data):
# based on https://github.com/Azure/Azure-Sentinel/blob/master/DataConnectors/AWS-S3/CloudWatchLanbdaFunction.py

# Convert message JSON to dataframe
df = pd.DataFrame(data["logEvents"])

if df.empty:
print("No events for specified time")
return None

# Convert unix time to zulu time for example from 1671086934783 to 2022-12-15T06:48:54.783Z
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df["timestamp"] = df["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%S.%f").str[:-3] + "Z"

# Remove unsupported column
df = df.drop(columns=["id"])

# Obtain log context information
account_id = data["owner"]
log_group = data["logGroup"]
log_stream = data["logStream"]

try:
# Update values in message column in the case that message is valid JSON
df["message"] = df["message"].apply( # pylint:disable=unsupported-assignment-operation, unsubscriptable-object
add_context_to_json_message, accountid=account_id, loggroup=log_group, logstream=log_stream
)
except json.decoder.JSONDecodeError:
# Update values in message column in the case that message is a string
df["message"] = df["message"].apply( # pylint:disable=unsupported-assignment-operation, unsubscriptable-object
add_context_to_string_message, accountid=account_id, loggroup=log_group, logstream=log_stream
)

# Return
return df


def lambda_handler(event, context): # pylint:disable=unused-argument
"""Lambda function triggered by S3 event"""

key = event["Records"][0]["s3"]["object"]["key"]

source_bucket = event["Records"][0]["s3"]["bucket"]["name"]
destination_bucket = os.environ["DESTINATION_BUCKET"]

logger.info("New files uploaded to %s S3 bucket", source_bucket)

try:
# read object
s3_object = s3.get_object(Bucket=source_bucket, Key=key)

# decompress data
data = gzip.decompress(s3_object["Body"].read()).decode("utf-8")

# at this point the file may contain multiple JSON objects, in which case we need to put them into a list

data_with_split_character = data.replace(']}{"messageType":', ']}|||{"messageType":')
log_events_list = data_with_split_character.split("|||")

logger.info("Processing %s log events", len(log_events_list))

count = 0
for log_event in log_events_list:
# increment count
count += 1

# convert string to json
event_json = json.loads(log_event)

# transform log event
transformed_event = transform_log_event(event_json)
pd.set_option("display.max_colwidth", 500)

# Export data to temporary file in the right format, which will be deleted as soon as the session ends
transformed_event.to_csv(
path,
index=False,
header=False,
compression="gzip",
sep=" ",
escapechar=" ",
doublequote=False,
quoting=csv.QUOTE_NONE,
)

# Upload data to desired folder in bucket
s3.upload_file(path, destination_bucket, f"{key}-{count}.gz")

logger.info("%s events copied to %s S3 bucket", count, destination_bucket)
print(f"{count} events copied to {destination_bucket} S3 bucket")

except botocore.exceptions.ClientError as error:
logger.error("There was an error copying the file to destination bucket")
print(f"Error Message: {error}".format)

except botocore.exceptions.ParamValidationError as error:
logger.error("Missing required parameters while calling the API.")
print(f"Error Message: {error}".format)

The Lambda function can be implemented using CloudFormation, Terraform or another tool. We needed to use the manylinux2014_x86_64 platform to avoid glibc errors due to incompatible versions.

Microsoft solution for multiple CloudWatch log groups

In April 2024, Microsoft issued a blog post outlining an alternative approach to integrating multiple CloudWatch log groups across various AWS accounts into Microsoft Sentinel.

The solution proposed by Microsoft uses a Lambda function which lists all CloudWatch log groups and streams, and periodically pulls logs from CloudWatch.

I haven’t tested this, however the code appears to be written for a single AWS account rather than multiple accounts, and by default would ingest all log groups rather than only those selected using a subscription filter.

Nevertheless, it’s good to see that Microsoft has recognised the limitations of the original connector which only supported a single CloudWatch log group.

--

--

Paul Schwarzenberger
Paul Schwarzenberger

Written by Paul Schwarzenberger

Paul Schwarzenberger is a cloud security architect and engineer, creator of OWASP Domain Protect, and cloud security trainer.

No responses yet