Using S3 as a caching layer for the ELK stack

Andrew Marwood
Just Eat Takeaway-tech
7 min readDec 6, 2021

Introduction

At Just Eat Takeaway we operate an ELK stack formed of Logstash, Elasticsearch, and Kibana to collect, store, and present log data from most applications in the platform. We index a significant volume of data every day which engineering teams use to monitor the health of their applications, identify trends, and troubleshoot issues their applications may have, all in near real time. Availability of this data is therefore critical to the successful operation of the platform.

If you have operated an Elasticsearch cluster you may have experienced that it can sometimes be temperamental, and one of the frequent issues we have faced is the contention between indexing data, and users searching that data. In extreme scenarios, when the elasticsearch cluster is under heavy indexing load a search can push the cluster just a little bit too far causing nodes to become unresponsive which can cause the cluster to go red if enough nodes and shards fall out of operation.

If the worst happens and the cluster becomes unavailable the last outcome we want is to lose log data. This article will discuss how the addition of an S3 caching layer improved the reliability of our infrastructure, and reduced the Mean Time to Recovery if an outage occurs.

Choosing a caching layer

We wanted the caching layer to act as a buffer in our Logstash pipeline, such that one set of Logstash instances would receive logs and write to the cache. A second set of Logstash instances would then read from the cache and output to Elasticsearch as shown in the diagram below.

With this design, if the Logstash processor instances cannot output to Elasticsearch, we have the opportunity to queue logs in the cache until Elasticsearch is available and we can resume indexing.

Initially we looked at using Kafka or Redis as the caching layer as they are both well supported by Logstash as inputs and outputs. However, operational simplicity was high on our list of requirements which led us to quickly rule out Kafka since at that time we did not want to host and manage a Kafka cluster.

A Proof of Concept using the AWS Elasticache for Redis service was created with a 3-node cluster using the cache.r6g.xlarge instance type. We tested with the Logstash Redis output plugin running on the Logstash receiver instances using the following config:

output {  
redis {
batch => true
data_type => "list"
host => "<elasticache_redis_host>"
key => "logstashreceiver"
timeout => 10
}
}

The Logstash processor instances used the Logstash Redis input plugin with the following config:

input {
redis {
id => "logstashprocessor_redis_input"
host => "<elasticache_redis_host>"
port => 6379
codec => "json"
key => "logstashreceiver"
data_type => "list"
tags => ["logstashreceiver", "redis"]
}
}

With this set up, we ran a load test to see how it performed and at first it looked like it would work really well. It made receiving logs asynchronous from log processing and we could see logs would build up in the Redis cache if there was an issue with the Logstash processor instances writing to Elasticsearch, then when Elasticsearch recovered, the logs from the cache were being successfully indexed.

This was great… until we ramped up the load test more, at which point it all went wrong. Redis stores data in memory, and the Redis instances we were using only have a limited amount of memory per node. This meant that we could only cache a relatively small volume of logs before the cache became full and the Logstash receiver instances would start to fail as they were no longer able to output to the cache. We could have scaled up the Redis cluster, but this is cost prohibitive so it was back to the drawing board.

Using S3 as a cache

Learning from the issues we encountered when using Redis it was clear that we needed a cache that would meet both our storage and performance requirements whilst not being prohibitively expensive. This led us to consider S3 as it has the following characteristics:

As before, we ran a PoC using S3 as the cache layer between our Logstash receiver instances and our Logstash processor instances. Logstash can be configured to output data to S3 using the S3 output plugin, the configuration we used is as follows:

output {
s3 {
region => "<aws_region>"
bucket => "<s3_bucket_name>"
prefix => "<s3_bucket_prefix>"
size_file => 1000000
time_file => 1
canned_acl => "private"
codec => "json_lines"
validate_credentials_on_root_bucket => false
}
}

With this config logstash will write to the S3 bucket specified by <s3_bucket_name> with objects written under the prefix <s3_bucket_prefix> (more on why the prefix is important later). The file will be written to S3 when it reaches 1 million bytes in size, or the accumulated file is 1 minute old. We ensure the written objects are private in s3, and the objects are written using the json_lines codec. Finally we turn off the s3 access validation that occurs when Logstash starts.

Reading from S3 with Logstash is a little less straightforward since the Logstash S3 input plugin has a few issues which made it unsuitable for our needs. The main issue was that only one instance can read from a prefix, if multiple instances read the same prefix, then each instance will index the data into Elasticsearch. We could have created a convoluted solution where the same number of instances write to and read from the S3 bucket using matching prefixes for the writer and reader, but this would have been error prone. Another issue was that if the Logstash instance was terminated, it would be difficult to configure a new instance to pick up from where the old instance had got to.

So how could we reliably get our data out of S3 and into Elasticsearch?

Amazon S3 Event Notifications to the rescue! S3 has the ability to send notifications to SNS, SQS, or Lambda for certain bucket events, including when an object has been created. There is also an S3 SNS / SQS input plugin for Logstash, we can use this to allow Logstash to subscribe to an SQS queue to be notified when an object is created in S3, it will then read this object and process it as normal.

The configuration we used for the input is as follows:

input {
s3snssqs {
id => "s3snssqs_input"
region => "<aws_region>"
s3_default_options => { "endpoint_discovery" => true }
queue => "<sqs_queue_name>"
delete_on_success => false
sqs_skip_delete => false
from_sns => true
visibility_timeout => 60
codec => "json_lines"
tags => ["s3sqs"]
}
}

With this configuration, the Logstash receiver instances receive logs and PUT them in S3. This triggers an S3:ObjectCreated event which is sent to an SNS topic. An SQS queue subscribes to the SNS topic, with the Logstash processor instances configured to receive from the SQS queue. When an SQS message is received, Logstash GETs the specified object from S3, processes it, and outputs the logs to Elasticsearch, this is outlined in the diagram below:

Additionally the SQS queue is configured with a dead letter queue to handle scenarios where the Logstash processor instances cannot process messages from SQS. Messages in the dead letter queue can either be reprocessed if an underlying issue is resolved, or purged from the queue.

It was mentioned earlier that the prefix in the S3 bucket where objects are written to and read from is important. This is because the S3 limits for PUT and GET operations are per prefix, therefore we can horizontally scale the S3 cache by writing to and reading from more prefixes. We can also adjust the size of the objects being written to S3 to reduce the number of PUT operations. Other limits worth noting:

Did it work?

The answer is a resounding yes! Under normal operation logs flow as expected through the S3 cache with very little latency added between when the log is generated and when it is indexed in Elasticsearch, typically this is less than 60 seconds. This is measured by calculating the difference between a log message timestamp, and the time just before the Logstash processor instances output the log to Elasticsearch.

During a recent outage in our Elasticsearch cluster, 3 nodes became unresponsive resulting in one write index being in a red state, causing the indexing rate to drop to almost 0 across the cluster. Whilst engineers worked to recover the cluster to a healthy state, the S3 cache worked exactly as expected. The Logstash processor instances were unable to successfully receive and process messages from SQS, resulting in messages building up on the SQS queue as shown below, with nearly 30k messages available to process.

Once the Elasticsearch cluster was recovered to a healthy state, the Logstash processor instances started to receive messages from the SQS queue and process logs from S3 as expected. This met our original aim of being able to recover from an Elasticsearch outage with no data loss.

Conclusion

The S3 caching layer in our logging pipeline has already proven its worth, the solution is simple to maintain due to leveraging AWS services and built in functionality in Logstash. It also opens up other opportunities for extending and improving the logging pipeline by allowing other systems to subscribe to the SNS topic. This could be used in the future to add extra log processing or to ship the logs to different destinations.

Just Eat Takeaway.com is hiring! Want to come work with us? Apply today.

--

--