Our longing for Centralised Logging

Daniel O'Neill
DAZN Engineering
Published in
8 min readDec 20, 2019

At DAZN, we recently revamped our application logging workflow. This blog aims to describe our implementation, talk about some of the learnings along the way, and highlight how we got to where we are now. So let’s get right into it!

The Before

The previous implementation of the logging workflow became a nightmare to manage. Many different teams at DAZN are working on their own services. Some teams produced their own logging libraries. With everyone writing logs in their own way, things began to get messy. The field types would conflict with other libraries used by other teams at DAZN. Conflicting field types lead to indexing errors and as a result, the logs became useless. This was not sustainable and something had to change.

Request For Comments

At DAZN we practice request for comments (RFC) documents. We want to create standards and best practices. This is exactly what took place when we wanted to solve our logging problem. These documents create a forum for discussion, where everybody involved can put forward their concerns and suggestions. Our RFCs live in a GitHub repository, this allows for a Pull Request to start the RFC and voting to determine the outcome.

Our RFC procedure is as described below:

  1. Create a branch from the RFC repository.
  2. Document the RFC & create a Pull Request.
  3. Review period begins. During this time, amendments and discussions take place for a period of 7 days.
  4. Voting period begins and lasts 14 days.
  5. RFC either passes or fails. If it passes the new standard is in place.

The New Age

And then it happened. RFC-007 was born. We created a document from many back and forth discussions. This document allowed for alignment amongst all the teams. We created a log library using a common schema and started implementing it in services across the business.

The Importance Of Common Schema

Having a log schema that everybody adheres to has a lot of benefits. It makes dashboards easier to template. The fields you are going to look for will be the same. You can create a variable to switch between services. Operations teams can be aware of the log fields that are important to them. They can be certain that each service will have a common set of these fields, thus making supportive tasks a lot easier. The list goes on.

Our Logging Infrastructure

The Ingest

A log message’s life starts off written by an application to a CloudWatch Log Group. We equip every single DAZN AWS account with a logging subscriber AWS Lambda. This Lambda handles streaming log messages, taken from the log groups to regional AWS Kinesis Ingest Streams. We create a subscription based on some pre-defined criteria. This happens upon the creation of the log group, as well as on a schedule to also allow un-subscriptions of unwanted logs.

The Processing

AWS Lambda functions consume the ingest Kinesis streams. These Lambda functions run a message processing tool called Benthos. This tool allows us to define, in YAML, instructions how to process incoming messages. Benthos comes with many built-in processors to do a range of operations. It also allows for custom plugins. Which allow for many different custom operations to happen on messages. As a result we have created a plugins to support RFC-007 schema validation, as well as plugin with rate limiting capabilities. Benthos supports a serverless mode. Which allows us to leverage AWS Lambda for the processing.

The Destination

We finally send all processed log messages to another Kinesis stream, which a shipper AWS Lambda function consumes. This Lambda ships the logs to our Centralised Logging tool ElasticSearch. We also send the logs to Amazon S3 for archiving purposes.

Kinesis & Lambda

Kinesis triggers for AWS Lambda makes log processing very convenient. Each shard of the Kinesis stream creates one Lambda function to consume it. In our case we always read the Kinesis stream using the TRIM_HORIZON trigger setting. This setting means that we process oldest record in the Kinesis stream first. We also configure the BATCH_SIZE of the trigger - this is to set the number of Kinesis records to process per Lambda Invocation.

Note: Kinesis record batch size does not state how many log messages will get retrieved. This number can vary. This sometimes caused timeouts for our log processing Lambdas.

One important metric that gives insight on the processing performance is the IteratorAge. This metric is the age in milliseconds of the oldest unprocessed record in the stream. This metric should be as close to zero as possible. If this number rises then this means the processing cannot keep up. The number of messages that are being sent to the stream is too large, or there is some error with processing.

Note: AWS Lambda does not decrement the iterator if a batch fails to process. By default (more on this later) the batch will continue to keep trying, until the process finishes or until the message expires due to retention.

Benthos

Benthos is a very flexible tool. It supports many inputs, processors and outputs. It is written in Go. It allows for anyone to create a message processing pipeline, without knowing how to code it yourself. Take a look at the example configuration below.

input:   
kafka_balanced:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup
batching:
count: 10
period: '100ms'
pipeline:
threads: 4
processors:
- jmespath:
query: '{ message: @, meta: { link_count: length(links) } }'
output:
s3:
bucket: TODO
path: "${!metadata:kafka_topic}/${!json_field:message.id}.json"

This example taken from GitHub showcases the basic premise of Benthos. In Benthos, you must define three main sections in your configuration. First you define an input, the source of your messages. In the example case this is a Kafka stream. Secondly you define the processors that will work on all messages sent by the input. You can create really complex workflows here. In the example a simple JMESPath processor is used query parts of the data. Finally, an output is configured. This is the destination for the processed messages. In the example an S3 bucket is used as the destination.

Benthos additionally supports things like caching, batching, metrics, multiple outputs and much more. This makes the tool all the more suitable for log processing at DAZN. One of the main reasons why we chose this tool is because we can run it as a service or we can run it in a serverless fashion. This was important to us as we wanted to ensure that we could scale this solution.

The Problems

We faced some problems with this implementation. Whilst running it in production, we also experienced some operational issues.

Spikes Of High Load

As we experience load it’s possible that iterator age may begin to rise. Generally if the processing is working and the Lambda is not failing or timing out, the processing will catch up. In the meantime, there will be an indexing delay of logs in Centralised logging. Kinesis streams allow for increasing in the number of shards. But there are limitations, you can only double the number of shards and new shards only accept new data. So if the Kinesis shards are already overloaded this will not help much.

Uncontrollable Number of Messages per Batch

Due to the nature of log group subscriptions, each Kinesis record batch can have a variable number of log messages. Sometimes it takes too long to process the number of messages from a batch and a Lambda timeout occurs. Increasing the timeout generally fixes the issue. Yet when the batch retries on the previous timeout configuration, it often gets stuck in a retry loop until fixed.

Centralised Logging Index Limits

Occasionally, one of our teams runs a load test against one of their services with verbose logging enabled and can generate a huge amount of data. This puts pressure on the Centralised Logging infrastructure. Additionally it can eat up the index limit of the Centralised Logging. Using up the limit for the day means that Centralised Logging will no longer index new messages. Other services relying on Centralised Logging can be operating with reduced visibility.

Failing Batches

Sometimes bad data that should have never made it to the ingest stream gets ingested. This can start causing Lambda errors. By default this data batch will keep failing and it will keep the iterator for that shard stuck on that point. This keeps happening until the batch gets older than the retention period of the stream. Once it reaches this stage the message expires.

The Solutions

By leveraging the flexibility of Benthos and by using some of the more recently released features in AWS we were able to fix these problems we face.

Parallelisation Factor

AWS very recently released a feature for Kinesis triggers for Lambda. This allows us to scale processing without the need to increase the shards in a stream. The parallelisation factor. It allows ability to control how many concurrent Lambda functions are processing. More Lambdas end up processing the same number of shards. The rate of processing should increase, and iterator age should drop. The configuration of the Parallelisation factor can be auto-scaled based on iterator age for example.

Bisecting On Failure

Too many messages cause Lambda timeouts. AWS released a feature for Kinesis triggers to split the batch in half when an error occurs. This is perfect for cases where there are too many messages retrieved for processing. By splitting the batch the processing should complete and the iterator on the shard is unblocked.

Rate Limiting

One of our biggest pain points in our logging solution in the past was using up the index limit. We’d lose visibility on our logging from the moment of lockout. Due to the flexibility of Benthos we were able to leverage AWS ElastiCache Redis. Using a custom Benthos processor to create a solution to rate limit. We limit based on the amount of logs ingested per log group. We avert over indexing by allowing a quota per log group. This is still an early adoption for us but we have plans extra features in the future to improve this.

Dead Letter Queue

Bisect on error works well when batch processing failures are due to a timeout. However, bisecting on error does not handle bad data batches, which will always regardless of the size of the batch. We create a DLQ when it comes to errors caused by unhandled processing. After a certain number of retries, the DLQ will take the failing batch and send it to an SNS topic. We can investigate and process this message at a later stage. Taking failing batches out is important as it allows for processing to continue. Shards are no longer getting stuck.

Conclusions

Centralised Logging at scale is difficult. It requires collaboration from everyone involved. But luckily, if you combine the amazing features that are available to you in AWS with excellent open source tools, such as Benthos, you are on the right track to success.

Good luck on your logging adventures!

--

--