Deepthi Nalla
Aug 24 · 6 min read

Log forwarding from S3 to ES on AWS using Lambda & Serverless Framework

Credits — miztiik/serverless-s3-to-elasticsearch-ingester
  1. Automate AWS resources lifecycle management.
  2. Automate the deployment of Lambda functions to help quickly iterate on the changes required to the function definition.
  3. Simplify the process of testing the Lambda function, which requires simulating s3 events.
  4. Simplify the process of debugging issues identified as part of the test cycle.

Serverless Framework to the rescue

Inspired by the work of Zane Williamson on silvermullet/hyper-kube-config, I decided to try out serverless.com to address those improvements. I soon realized that this amazing framework has addressed all those improvements.

brew install serverless
serverless -v
brew install virtualenv
virtualenv — version
virtualenv s3-lambda-serverless
source s3-lambda-serverless/bin/activate
mkdir appcd app
sls create --template aws-python3 --name s3-to-es
handler.py
serverless.yml
.gitignore
import requests
import boto3
import os
import re
from requests_aws4auth import AWS4Auth
def pushToElasticSearch(event, context):
region = 'us-west-2' # e.g. us-west-1
service = 'es'
# make sure that these env variables are set in lambda console awsauth = AWS4Auth(os.environ['SERVERLESS_AWS_ACCESS_KEY_ID'], os.environ['SERVERLESS_AWS_SECRET_ACCESS_KEY'], region, service)
host = '' # the Amazon ES domain, including https://
index = 'lambda-s3-index-2'
type
= 'lambda-type'
url
= host + '/' + index + '/' + type
# Regular expressions used to parse some simple log lines
ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')
message_pattern = re.compile('\"(.+)\"')
s3 = boto3.client('s3')
headers = {"Content-Type": "application/json"}
try: for record in event['Records']:
# Get the bucket name and key for the new file
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
# Get, read, and split the file into lines
obj = s3.get_object(Bucket=bucket, Key=key)
body = obj['Body'].read()
lines = body.splitlines()
# Match the regular expressions to each line and index the JSON
for line in lines:
line = line.decode("utf-8")
if ip_pattern.search(line):
ip = ip_pattern.search(line).group(1)
if time_pattern.search(line):
timestamp = time_pattern.search(line).group(1)
if message_pattern.search(line):
message = message_pattern.search(line).group(1)
document = {"ip": ip, "time_stamp": timestamp, "message": m
r = requests.post(url,auth=awsauth, json=document, headers=headers)
except Exception as e:
print("Exception occured:", e)
  • Create a package.json file to save all your node dependencies
npm init
npm install --save serverless-python-requirements
service: s3-to-es # service name mentioned while creating templateprovider:
name: aws
runtime: python3.7
profile: serverless #user profile used for sls deploy
versionFunctions: false
region: us-west-2
iamRoleStatements: #IAMrole permissions associated with lambda fn
- Effect: "Allow"
Action:
- s3:GetObject
Resource: "*"
- Effect: "Allow"
Action:
- es:ESHttpPost
- es:ESHttpPut
- es:ESHTTPGet
Resource: "arn:aws:es:us-west-2:<aws-account-id>:domain/my- test-domain"
functions:
pushToElasticSearch: #lambda function name
handler: handler.pushToElasticSearch #lambda function code
description: pushes logs to ES on s3 object create event
events:
- s3:
bucket: log-dest #bucket name
event: s3:ObjectCreated:*
rules:
- prefix: logs/ #folder inside a bucket
existing: true #mark as true if bucket already exists
plugins:
- serverless-python-requirements
resources:
Resources:
elasticSearch:
Type: AWS::Elasticsearch::Domain
Properties:
DomainName: "my-test-domain"
AccessPolicies:
Version: '2012-10-17'
Statement:
- Effect: "Allow"
Principal:
AWS:
- "arn:aws:iam::<aws-account-id>:user/serverless-es"
Action:
- "es:*"
Resource: "arn:aws:es:us-west-2:<aws-account-id>:domain/my-test-domain/*"
ElasticsearchVersion: 6.7
ElasticsearchClusterConfig:
InstanceType: "t2.small.elasticsearch"
InstanceCount: 1
DedicatedMasterEnabled: "false"
ZoneAwarenessEnabled: "false"
EBSOptions:
EBSEnabled: true
Iops: 0
VolumeSize: 10
VolumeType: "gp2"
  • Create a lambda function pushToElasticSearch
  • Add a trigger to lambda function on object create event in S3
  • Create an Elastic Search Domain with the configuration mentioned in the resources section.
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-west-2",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "log-dest",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::log-dest"
},
"object": {
"key": "logs/access-log.log",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
sls invoke local --function pushToElasticSearch -p ./event.json --log
sls deploy
sls invoke --function pushToElasticSearch -p ./event.json --log

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade