Amazon Kinesis Data Streams: Auto-scaling the number of shards

Brandon Stanley
Slalom Data & AI
Published in
12 min readNov 27, 2019
Photo by Robert Lukeman on Unsplash

Many enterprises are looking at options for real-time data streaming solutions to the cloud using different technologies. Amazon Web Services (AWS) provides various services to assist in ingesting data, such as: Kinesis Data Streams, Kinesis Video Streams, Kinesis Firehose, Managed Kafka (MSK), and MQ. Our team at Slalom has worked on projects to set up, design, and architect solutions using these streaming services in AWS.

One of our solutions enhanced the capability of Kinesis Data Streams by dynamically scaling the number of shards in a stream to address producers with unpredictable or varying throughputs. Moreover, the deployment of the solution was automated using CloudFormation. The following AWS services were used: CloudFormation, SNS, Lambda, and CloudWatch Alarms.

This article discusses why and how the solution was developed.

Introduction

Who is this for?

This solution is for anyone looking to extend the capabilities of Kinesis Data Streams by dynamically scaling the number of shards.

It is particularly useful for systems in which:

  1. The value of the data diminishes over time (explained further in the Issue section)
  2. The throughput of producers is unpredictable or varies

What is Amazon Kinesis Data Streams?

Amazon Kinesis Data Streams is a highly scalable and durable managed streaming service. Use cases include aggregating data from financial markets driving real-time trading decisions to analyzing sensor data at a warehouse to determine when a machine requires servicing or breaks.

Refer to the following link for more details:

What are Shards?

Amazon Kinesis Data Streams are made up of shards. A shard represents a sequence of records in a stream. It is also used to represent the base throughput unit:

One shard can:

  1. Ingest 1 MiB/second or 1,000 records/second.
  2. Support up to a maximum total data read rate of 2 MiB/second via the GetRecords API.

Refer to the following link for more details:

What is Auto-scaling?

Auto-scaling is a feature that allows components of an application to scale up or down depending on the load dynamically. Specifically, within this context of this post, auto-scaling refers to the ability of the stream to automatically increase or decrease the number of shards based on the throughput of its producers.

Issue

Unlike some other AWS services, Kinesis does not provide a native auto-scaling solution like DynamoDB On-Demand or EC2 Auto Scaling. Therefore, there is a need for the right number of shards to be calculated for every stream based on the expected number of records and/or the size of the records. This can lead to over/under-provisioning of shards within a stream resulting in higher costs and/or data ingestion being throttled.

Implications

Over/under-provisioning

The obvious implication of over-provisioning is overpaying. Under provisioning results in throttled data ingestion which leads to the next point.

Throttled data ingestion

The capacity of a stream is determined by the number of shards in the stream. Each shard can handle up to 1000 records/second or 1MiB/second. Attempting to write more than the provisioned capacity will result in throttling.

During a talk at re:Invent 2018, Allan MacInnis presented a slide about the value of data over time which was sourced from Mike Gualtieri’s report: Perishable Insights — Stop Wasting Money On Unactionable Analytics

The value of data over time

The slide above was sourced from the following video:

AWS re:Invent 2018: High Performance Data Streaming with Amazon Kinesis: Best Practices (ANT322-R1)

The insights of your data is perishable, meaning over time it decays or the value of what you can get out of that data diminishes over time.

The key takeaway is that the value of data decays over time. Not being able to react quickly to certain data renders it no longer useful. Kinesis excels at being able to quickly ingest vast amounts of data. However, if the data loses its value over time and the stream is not configured with the appropriate number of shards, the derived value from the stream diminishes.

What’s available out of the box?

Amazon makes it simple to scale the stream up or down using the UpdateShardCount, MergeShards, and SplitShard APIs. But, in the absence of an intelligent mechanism for scaling the stream, the responsibility of calling the APIs for increasing/decreasing the number of shards is left to the producer(s). The goals of the solution include removing the need for producers to scale the stream and to have a single solution for all Kinesis Data Streams streams that require auto-scaling within an account.

Solution

Considerations

Before discussing the solution, it is important to consider the following:

  1. Metrics: CloudWatch Metrics can be captured at the stream level or the shard level. IncomingBytes, IncomingRecords, or WriteProvisionedThroughputExceeded are metrics of interest, which are all available for streams and shards.
  2. Scaling frequency: By default, the UpdateShardCount API can only be called a certain number of times per rolling 24-hour period. Refer to the documentation for the exact limit.
  3. Max and Min Shard Counts: During scale out and scale in operations, the number of shards can be increased to a maximum of 2x and a minimum of 0.5x, where x is the current number of open shards in the stream. Refer to the UpdateShardCount API documentation for further details.
  4. Shard Count Increments: When using the UpdateShardCount API, AWS recommends a target Shard count which is a multiple of 25% (e.g., 25%, 50%, 75%, 100%) of the current open shard count. Any target value within shard limits can be specified. However, if a target is specified which is not a multiple of 25%, the scaling action might take longer to complete. Refer to the UpdateShardCount API documentation for further details.

Solution Overview

Auto-scaling solution diagram

The auto-scaling solution uses the following services:

  1. CloudWatch Alarms
  2. SNS
  3. Lambda
  4. CloudFormation

CloudWatch Alarms

The following metrics were used to determine when to scale:

IncomingBytes: The number of bytes successfully put to the Kinesis stream over the specified time period. This metric includes bytes from PutRecord and PutRecords operations. Minimum, Maximum, and Average statistics represent the bytes in a single put operation for the stream in the specified time period.

IncomingRecords: The number of records successfully put to the Kinesis stream over the specified time period. This metric includes record counts from PutRecord and PutRecords operations. Minimum, Maximum, and Average statistics represent the records in a single put operation for the stream in the specified time period.

Using the metrics above, we created alarms that would get triggered when either the IncomingRecords or IncomingBytes metrics breach specified thresholds based on the provisioned capacity. Our initial approach was to set off an alarm when the threshold was breached at the second level (i.e. set off the alarm if one data point exceeded 800 records/second). However, the metrics published to CloudWatch were not this granular.

For example, when writing 900 records/second to the stream, the maximum statistic produced data points where the value was 500 regardless of the period. The documentation for the PutRecords API uncovered that each request could support up to 500 records and that was being published to CloudWatch, even though 900 records/second were being written.

The workaround was to use the sum statistic for a period of 60 seconds. In other words, if the IncomingRecords or IncomingBytes metrics exceeded 80% of the provisioned capacity or for an entire minute, the alarm could then be triggered.

With these concepts established, the scale-up and scale-down alarms and their specific metrics can be discussed.

Scale-up

The scale-up alarm is triggered when either the IncomingRecords or IncomingBytes metrics are greater than 80% of the provisioned capacity for a minute.

CloudWatch Alarm:

Scale-up alarm metrics
  • m1: Capture the IncomingRecords (actual)
  • m2: Capture the IncomingBytes (actual)
  • e1: Fill missing data points for IncomingRecords with 0s
  • e2: Fill missing data points IncomingBytes with 0s
  • e3: Calculate actual capacity/provisioned capacity (IncomingRecords), where provisioned capacity=num_open_shards*1000*60
  • e4: Calculate actual capacity/provisioned capacity (IncomingBytes), where provisioned capacity=num_open_shards*1048576*60
  • e5: Determine the maximum between e3 and e4

Threshold: > 0.8

Period: 1 minute

Datapoints to alarm: 1 of 1

Missing data treatment: Treat missing data as ignore (maintain the alarm state) — This is negated since missing values are filled with 0s

Scale-down

The scale-down alarm is triggered when both the IncomingRecords and IncomingBytes metrics are below 70% of the specified capacity for 5 consecutive minutes. The scale-down alarm will use a lower number of shards than the scale-up alarm to determine whether scaling down is appropriate. The idea behind the scale-down alarm is to scale down only if current throughput given a lower number of shards would remain under the desired percentage (70%).

Since the solution was deployed via CloudFormation, we need to ensure that the Alarm would be deleted once the stack is deleted. Therefore, the scale-down alarm must be included during the creation of the stack. Therefore, alarm action is initially disabled and has an initial threshold of -1 (so that it will not appear to be in alarm when at the minimum shard count). The Lambda function also dynamically disables the alarm when the number of shards reaches a minimum.

CloudWatch Alarm:

Scale-down alarm metrics
  • m1: Capture the IncomingRecords (actual)
  • m2: Capture the IncomingBytes (actual)
  • e1: Fill missing data points for IncomingRecords with 0s
  • e2: Fill missing data points IncomingBytes with 0s
  • e3: Calculate actual capacity/specified capacity (IncomingRecords), where provisioned capacity=specified_num_shards*1000*60
  • e4: Calculate actual capacity/specified capacity (IncomingBytes), where provisioned capacity=specified_num_shards*1048576*60
  • e5: Determine the maximum between e3 and e4

Threshold: < 0.7

Period: 1 minute

Datapoints to alarm: 5 of 5

Missing data treatment: Treat missing data as ignore (maintain the alarm state) — This is negated since missing values are filled with 0s

SNS

The SNS topic is used to connect CloudWatch and Lambda. When the alarm reaches a state of “IN ALARM”, the SNS topic is a target. which has the Lambda function as a subscriber.

In addition, SNS can be used to notify when the stream successfully updates or if exceptions/errors (permissions, API exceptions, etc.) occur during an attempt to update.

Lambda Function

The SNS topics invoke the Lambda function when the alarms are triggered. The Lambda function parses the event and scales the stream up or down. Upon successful scaling, the Lambda function updates the CloudWatch Alarms to reflect the new thresholds that have been determined.

This section provides code snippets for the Lambda function on how to scale the stream and update the CloudWatch Alarms.

Additional Note(s):

  1. The API calls to AWS services are surrounded by try/except blocks to address any errors (Example: LimitExceededException when trying to update the stream via the UpdateShardCount API beyond the specified limit)

Required Python imports:

Required Python imports

Boto3 initializations:

Boto3 initializations

Retrieving the Stream name from the alarm metrics:

Retrieving the Stream name from the alarm metrics

Determining the new shard count (sample numbers used for scale-up and scale-down percentages):

Determining the new shard count (sample numbers used for scale-up and scale-down percentages)

Updating the shard count and waiting until the stream is active:

Updating the shard count and waiting until the stream is active

Determining whether the scale-down alarm should be disabled (this can be altered to a specific minimum value instead of 1 by adding a tag to the stream and comparing the value of the new shard count to the tag value):

Determining whether to disable the scale-down alarm

Updating the scale-up alarm:

Updating the scale-up alarm

Calculating the new scale-down alarm shard count:

The scale-down alarm will use a different number of shards to determine whether scaling down is appropriate. Refer to the CloudWatch Alarms section for further details.

Calculating the new scale-down alarm shard count

Updating the scale-down alarm:

Updating the scale-down alarm

CloudFormation

The solution can be deployed via CloudFormation and requires two stacks:

  1. Auto-scaling stack: Lambda Function, SNS topic, IAM Role, & IAM Policy
  2. Kinesis Stream CloudWatch Alarms stack: Scale-up and scale-down alarms for each stream

Once the Lambda function template is deployed, the alarms for invoking the Lambda function can be created.

Each stream will have two alarms created: a scale-up alarm and a scale down alarm. Both alarms will have an SNS topic as a target which will forward the message (alarm) to the Lambda function. The Lambda function will then parse the event and scale the stream up or down accordingly.

Testing/Results

Allan MacInnis developed an incredibly useful data generating tool for testing out Kinesis Streams. This tool was used to test the auto-scaling functionality.

Link to the AWS Blog discussing the tool: Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator

Link to the tool: Kinesis Data Generator

Prerequisites for testing

Before testing the solution, the following resources were created (via CloudFormation):

  1. Kinesis Data Stream (with 1 shard)
  2. SNS Topic
  3. Lambda Auto-scaling Function (including IAM Role and Policy)
  4. Scale-up and Scale-down CloudWatch alarms

Setting up the Kinesis Data Generator

The help page contains the template for creating the required resources for the Kinesis Data Generator:

Creating the stack for the Kinesis Data Generator

The link from the help page launched the CloudFormation service (in us-west-2) with the template populated in the S3 URL:

Kinesis Data Generator CloudFormation Stack

On the stack details page, we provided the credentials that would be used to sign in to the Kinesis Data Generator:

Parameters for Kinesis Data Generator

Once the stack is launched, the Outputs tab provided the URL to access to Kinesis Data Generator for our AWS environment:

Kinesis Data Generator URL

We opened the URL above and entered the credentials that we provided when creating the Kinesis Data Generator stack:

Signing in to the Kinesis Data Generator

Sending records to the Stream

After signing in to the Kinesis Data Generator, we sent records to the stream.

Sending 900 records/second from the Kinesis Data Generator:

Sending 900 records/second from Kinesis Data Generator

The Scale-up alarm in CloudWatch displayed details about the alarm. We were able to view the metrics in near real-time via the “View in metrics” button:

Scale-up CloudWatch Alarm

Real-Time View (Ensure that the auto-refresh feature is turned on):

Scale Up Alarm in Metrics (Real-time view)

As mentioned in the CloudWatch Alarms section, the alarm reached a state of “IN ALARM” after the sum of the IncomingRecords or IncomingBytes passed 80% of the provisioned capacity for 60 seconds. Navigating back to the Scale-up alarm confirms that the alarm reached a state of “IN ALARM”:

CloudWatch Scale-up Alarm

The Kinesis Data Streams console indicated that the stream was updating:

Kinesis Data Stream Updating (Scale-up)

After updating, we were able to verify the details of the stream:

Kinesis Data Stream Details (after scaling-up)

We then stopped the Kinesis Data Generator:

Stopping the Kinesis Data Generator

After 5 minutes, the updated scale-down alarm went into a state of “IN-ALARM”:

Scale-down Alarm in “IN ALARM” state

This led to the stream updating once again:

Kinesis Stream Updating (Scale-down)
Kinesis Data Stream Details (after scaling-down)

Conclusion

This solution extends the capability of Kinesis Data Streams by allowing for unpredictable workloads with varying throughputs to be ingested. The event-driven approach (via CloudWatch Alarms) to update the number of shards decreases the possibility of losing the insights from the data not making it to the stream in a timely manner. In turn, the greater insights can be drawn from the stream increasing its usefulness. In addition, the costs incurred by the stream are reduced since the stream will contain only the required number of shards and will update based on increases/decreases in throughput.

Team Members: Hashir Ahmed, Anne Nasato, Wail Omar, Doorlabh Panjwani, Brandon Stanley

While working on this solution, our team also developed a new solution for dynamically scaling the Retention Period for Kinesis Data Streams. You can read an article about this solution here.

--

--