Configure index rollover for AWS Elasticsearch indexes using Lambda function ( with EFK )

Introduction

The AWS Elasticsearch does not expose the index life cycle management (ILM) policies API in AWS cloud environment (where ES expose that in their standalone version in 7.x), hence you can not automate the Index rollover with AWS Elasticsearch and its a limitation in AWS Elasticsearch. However you can achieve the same using AWS Lambda and AWS Cloud Watch together with ES. This article will illustrate how to configure automatic index rollover for the Indexes created for namespace in a Kubernetes Cluster. Note that, although the steps are given for involving the Kubernetes Cluster, you can use same for the ES only deployment in AWS and use the steps for the index rollover where EKS (Kubernetes) has no involvement.

Note this is the continuation of the following story :
Deploy EFK (Elasticsearch, Fluentd and Kibana) to AWS Kubernetes cluster

Configuration Steps

Note: To configure the index rollover, We have to change the default Lambda function created, at the configuration time of the EFK given in the https://eksworkshop.com/logging/ document.

References

Note: Use Kibana “Dev tools” or ES REST API to perform following commands.

  • Step 1: Create indexes for all namespaces in the environment by following the below naming convention. Repeat same for all namespaces.<namespace_name>_logs-000001
    eg: namespace_logs-000001
    Command :
PUT /namespace_logs-000001
  • Step 2: Create index aliases for all namespaces in the environment by following the below naming convention. Repeat same for all namespaces.<namespace_name>_logs_write
    eg: namespace_logs_write
    Command :
POST /_aliases
{
"actions" : [
{ "add" : { "index" : "namespace_logs-000001", "alias" : "namespace_logs_write" } }
]
}
  • Step 3: Navigate to Lambda function created for log streaming at the initial configuration time.
    eg: LogsToElasticsearch_<name>
  • Step 4: Replace the index creation code in the Lambda NodeJS function to bellow code. You can either replace the code snippet or the index.js full function. Make sure to change the end point if you use the attached script.
    ** Code snippet:
var jsonMessage = JSON.parse(logEvent.message);
var namespace = jsonMessage['kubernetes']['namespace_name'];
var indexName = [
namespace + '_logs_write'
].join('.');

** Full index.js Code:

// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint = 'endpoint.com';

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }

// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));

// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);

// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
return;
}

// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));

if (error) {
logFailure(error, failedItems);
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
});
};

function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}

var bulkRequestBody = '';

payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);

var jsonMessage = JSON.parse(logEvent.message);
var namespace = jsonMessage['kubernetes']['namespace_name'];

var indexName = [
namespace + '_logs_write'
].join('.');

var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;

var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;

bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
});
return bulkRequestBody;
}

function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};

for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];

if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}

jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}

source[key] = value;
}
}
return source;
}

jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}

return {};
}

function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}

function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
var requestParams = buildRequest(endpoint, body);

var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});

response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
var error;

if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});

success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}

if (response.statusCode !== 200 || info.errors === true) {
// prevents logging of failed entries, but allows logging
// of other errors such as access restrictions
delete info.items;
error = {
statusCode: response.statusCode,
responseBody: info
};
}

callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');

var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};

var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('\n');

var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');

var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');

var credentialString = [ date, region, service, 'aws4_request' ].join('/');

var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');

request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');

return request;
}

function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
if (logFailedResponses) {
console.log('Error: ' + JSON.stringify(error, null, 2));

if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
}
}

Finally save the function.

  • Step 5: Now indexes will be creation with “000001” prefix and suffix by the namespace, So you can delete the previously created indexes by using the Kibana Dev tools if needed.
  • Step 6: Now it is the time to configure the automatic rollover.
    Navigate to AWS Lambda and create a node JS function.
    eg: ES_Index_Rollback_tenant1_NS
  • Step 7: Copy and past the following code to the function that you have created previously.
    Make sure to change the <host> to ES endpoint and the <namespace> to the appropriate namespace in the path variables.
    Also you can change the “max_age” to appropriate value if you needed. The default one is use as “1d” (1 day — So the indexes will be rolled over daily)
var querystring = require('querystring');
var http = require('https');

exports.handler = function(event, context) {

var obj = {
"conditions": {
"max_age": "1d"
}
};

console.log(JSON.stringify(obj));

// Post configuration
var post_options = {
host: 'endpoint.host.com',
port: 443,
path: '/tenant1_logs_write/_rollover',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(JSON.stringify(obj))
}
};

console.log('post_options' + post_options);


// Set up the request
var post_req = http.request(post_options, function(res) {
res.setEncoding('utf8');
res.on('data', function(chunk) {
console.log('Response: ' + chunk);
context.succeed();
});
res.on('error', function(e) {
console.log("Got error: " + e.message);
context.done(null, 'FAILURE');
});

});
console.log('post_req' + post_req);

// post the data
post_req.write(JSON.stringify(obj));
post_req.end();

}

Note : Repeat the same process for the all namespaces in the environment.

TODO : Modify the script to fire the rollover API for all aliases at once by parameterizing/looping the namespace names. So you can prevent creating Lambda functions for each namespaces.

  • Step 8: Automating the triggering process of the above created Lambda function.
    Navigate to cloudWatch Rules in AWS console and click on “Create rule” button.
  • Step 9: Select “Schedule” Option button and configure the Rule to run on every 6 hours (360 minutes) by using “Fixed rate of” Option.
    Then Click on “Add Target” button at the right of the screen and select Lambda function that you have created in the previous step.
    Finally click on “Add Target” button.
    Note : Repeat the same rule creation process for the all namespaces in the environment.

Now the index rollover mechanism will run automatically by every 6 hours.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store