Delivering data in real-time via auto scaling Kinesis streams

Photo by Hendrik Cornelissen on Unsplash

Abstract

Kinesis is a managed streaming data service offered by Amazon Web Services (AWS). It is used extensively at Disney Streaming Services for real-time and batch analytics, and supports features like personalized views, stream concurrency, and analysis of application domain events. In this blog post, I will be going over how our API Services team at Disney Streaming Services implemented an auto scaling feature to Kinesis, which allows us to gracefully stream data during peak periods of traffic, while remaining cost-efficient.


Problem

What Our Team Does

Here at Disney Streaming Services, we on the API services team (myself included) are responsible for applications that expose our public API to clients — which means we are heavily involved with client communication protocols, scaling to support traffic demands, providing reliability through fallbacks and degradation, and security.

Like most applications deployed using AWS, our applications log events to CloudWatch Logs. Since CloudWatch is also a managed service provided by AWS, it is an easy integration for us to store and query application events. We also emit our application events into a larger data lake platform — which supports a richer analysis and visualization of our application events. This is where Kinesis streams come in.

Motivation

With Kinesis streams chosen as the entry point for our data lake platform, we needed to ensure that data would not be lost or fall behind real-time delivery for extended periods of time.

A naïve solution would be to over-provision the stream. However, this is not cost-effective, since it would waste money during the majority of the day.

We also looked into the Kinesis Scaling Utility , which is an application offered by AWS Labs. It can monitor metrics via CloudWatch and scale Kinesis streams based on configuration. However, it wasn’t the best solution to meet our demands, due to the following reasons:

  1. Scaling up does not occur fast enough.
  2. The application needs to be constantly running, which incurs extra cost.

The two aforementioned points are a result of the application’s approach to monitoring metrics — polling CloudWatch at set intervals. My team needed scaling to occur as soon as possible and be cost-effective, so we moved onto creating our own solution.


Kinesis Basics

To better understand the choices made for our solution, I have included some basics of how Kinesis streams work. For further documentation, please refer to the key concepts page provided by AWS.

Shards

Kinesis streams are allocated a set number of shards at creation. Every shard in the stream has a hash key range, which is a range of valid integer values. At creation, these shards are considered open, meaning they can receive data and incur cost.

For every record added to the stream, a partition key must be defined. The stream hashes this partition key, the result being an integer. The stream determines which hash key range the resulting integer falls in and sends the record to the correct open shard.

An explicit hash key can optionally be defined when adding a record to a stream, which will force the record to be sent to a specific open shard.

Scaling

The process of scaling a Kinesis stream is called resharding. It can be initiated asynchronously by calling UpdateShardCount. A target shard count (how many shards to scale to) must be provided.

Scaling a stream down merges pairs of shards to achieve the desired total. Scaling a stream up splits multiple shards in half to achieve the desired total.

This means that the lowest a stream can be scaled down to is half of its current open shard count. Conversely, this also means that the highest a stream can be scaled up to is double its current open shard count.

For example — a Kinesis stream has 12 open shards. When calling UpdateShardCount on this stream, the target shard count must be within the range of [6, 24]. A value outside of this range will result in an error.

Data Availability

Kinesis streams have a set retention period for data, which is defaulted to 24 hours.

After resharding occurs, shards will be closed, which means they can no longer receive data. They do not incur cost (see “Q: Am I charged for shards in “CLOSED” state?”) and will be retained until the data retention period passes.


Requirements

To accomplish the goal of delivering CloudWatch log data to auto scaling Kinesis streams, several distinct components will need to be created. We will organize these components into two individual stacks to ensure future re-usability.

Auto Scaling Stack

Scale a Kinesis stream and its associated resources up during heavy usage and down during an off-peak hour.

Kinesis Stream
The primary destination for processed data. This data can drive real-time processing or be stored for batch analytics.

This stream can either be created at the same time as its associated scaling components, or already exist within the AWS environment.

Scale Up
A Lambda which scales up the Kinesis stream, the alarm which triggers it based on a calculated throughput for a Kinesis metric and optionally an external Lambda. The alarm which handles triggering the Scale Up Lambda tracks a metric reported by the Kinesis stream.

Scale Up architecture

To enable tracking of when scaling up occurs, the Lambda will report two custom metrics (OpenShards and ConcurrencyLimit) to CloudWatch whenever successfully invoked. These custom metrics will allow us to monitor scaling behavior.

Scale Down
A Lambda which scales down the Kinesis stream, the scale-up alarm and optionally an external Lambda to their original settings.

Once a day during an off-peak hour (after failed logs have been processed), a CloudWatch Rule will trigger the Scale Down Lambda at 10-minute intervals. This is done to counteract the limitation Kinesis has for scaling down (the lowest valid target shard count is half of the current open shard count).

This Lambda will skip the scale-down process if the stream is currently under heavy use, if it is currently being scaled down or if it has already been scaled down to the default amount of shards.

Scale down architecture

Like the Scale Up Lambda, this Lambda will also report two custom metrics (OpenShards and ConcurrencyLimit) to CloudWatch whenever successfully invoked.

Log Processing Stack

Process events from CloudWatch Logs, sending the results to a Kinesis stream.

Logs Processor
A Lambda which will process events from a chosen log group. Results will be sent to a Kinesis stream.

If any log events within a batch fail to be sent to the Kinesis stream (are returned with an error code), the Logs Processor Lambda will use exponential backoff and jitter for its initial attempts at re-sending failed log events to the Kinesis stream. This enables concurrent Logs Processor invocations to retry sending log events at different times.

Its reserved concurrent executions (how many concurrent Lambdas can run at once) will be equal to the number of shards allocated to the Kinesis stream. This avoids writing more data to the Kinesis stream than it can handle. It also gives us direct control of how fast data flows into the Kinesis stream, which means data will fall behind real-time delivery rather than be completely lost.

Failed Logs Processor
To account for potential failures of the aforementioned Logs Processor, any failed batches of log events (have been retried twice and still failed) will be saved to a Dead Letter Queue (DLQ).

Once a day during an off-peak hour, a CloudWatch Rule will trigger the Failed Logs Processor. This separate Lambda will ask the DLQ for any failed log events and reprocess them via the Logs Processor.

To avoid timeouts and long run times, the Failed Logs Processor will have the ability to re-invoke itself asynchronously for continued reprocessing of failed log events, assuming more failed log events are available.


Architectural Solution Overview

With the components of our architecture planned out, we can move onto how we will leverage them to process log events and automatically scale Kinesis streams.

Key Metrics

As previously mentioned, the Scale Up Lambda will use an alarm to monitor a Kinesis metric to see if it crosses a calculated threshold.

The recommended approach is to measure the sum of IncomingRecords or IncomingBytes from the associated Kinesis stream over 5 minutes. This will give us direct insight to how much data is flowing into the stream and make informed decisions regarding scaling.

Threshold Calculation

After choosing one of the above recommended metrics, we can move onto calculating a threshold we would like to monitor.

For a Kinesis stream with n shards, a Lambda will scale to at most n invocations (as controlled by its reserved concurrent executions).

Each Lambda sends an average of m records to the Kinesis stream per second.
The period by which the alarm monitors the sum of a metric is s seconds.

Thus, the threshold to monitor is n * m * s.

To ensure scaling up occurs before data falls behind, we can instead monitor a percentage of the calculated threshold. Since 80% is considered best practice by AWS, we will monitor that value instead going forward.

Architecture

Since both stacks are independent and generic, they can be deployed alone or in tandem. When both are deployed targeting the same Kinesis stream, the result is a solution to the problem we started with.

Architecture topology

Validating Results

With the architecture deployed for one of our applications, we need to validate that our data is available in real-time and that scaling is happening when required.

First, we can compare the number of log events being forwarded to the Logs Processor Lambda versus the number of records being written to the Kinesis stream using CloudWatch to ensure that data is not falling behind.

Forwarded logs vs. processed logs

The sum of log events forwarded to the Logs Processor is equal to the sum of records being sent to Kinesis for each data point. This means that processed data is available in real-time!

Last, we can use Grafana to visualize the custom metrics we are reporting versus the average amount of concurrent Logs Processor Lambdas.

Custom metrics vs. average concurrency

Scaling up occurs once a set threshold is crossed, while scaling down begins at a set time during an off-peak hour and continues until finished. The average number of concurrent Logs Processor Lambdas never crosses the concurrency limit as well. This confirms that we are scaling Kinesis streams automatically!

Conclusion

We have successfully developed a solution architecture with two reusable CloudFormation templates that can be deployed independently or together.

The log processing template enables us to transform data generically with minimal effort. All of the boilerplate code surrounding CloudWatch Logs and Kinesis is handled behind the scenes. This enables teams to focus exclusively on how they would like to transform their data.

The auto scaling template enables us to define when and how a Kinesis stream is safely scaled up and down. Kinesis streams no longer need to be over-provisioned in order to potentially avoid sudden spikes. This minimizes manual intervention and reduces overall cost.

When both templates are deployed together, we also gain control of the rate at which transformed log events flow into a Kinesis stream. In the event of a sudden spike, data will temporarily fall behind real-time delivery until scaling up is completed. This is highly preferable to retrying failed batches of log events later on, since it minimizes the probability of log events being completely dropped or processed multiple times.

Overall, building this solution architecture was a lot of fun! Although it was initially developed for API Services’ use case, I am glad that we generalized the architecture into two independent stacks. This will enable other teams at Disney Streaming Services to leverage both templates and contribute improvements to the architecture.