Amazon Kinesis Data Streams: Auto-Scaling Data Retention Period

Anne Nasato
Slalom Data & AI
Published in
11 min readNov 27, 2019

Streaming data is becoming increasingly important in big data applications, from gaming activity to trading floors to internet of things (IoT). Critical to the success of these use cases is processing the data in-sequence for downstream applications; any data loss can have varying degrees of negative consequences.

In order to address this issue, my team and I recently implemented a solution to auto-scale the data retention period of Kinesis Data Streams. This solution is discussed in-detail below.

Who is this for?

This solution is for anyone using Kinesis Data Streams to feed their downstream applications (“consumers”).

It is particularly useful for systems in which:

  1. There are a variety of downstream consumers with different consumption rates, or consumers whose consumption rates are unknown.
  2. All data is critical for downstream consumers (no data loss can occur) but KDS owners only want to pay for what they actually use.

What is Amazon Kinesis Data Streams?

Amazon Kinesis is a fully-managed, scalable streaming data service on the AWS platform which can ingest various forms of streaming data for analytics (including real-time) and applications. Kinesis is applied across a variety of use cases, including AI applications, clickstream data collection and analytics, and event monitoring for application performance (Netflix uses Kinesis in this way).

Kinesis Data Streams (KDS) is a component of Kinesis. KDS is a service used to ingest and temporarily store data streams for processing by downstream consumers. Downstream consumers “get” data from streams (it is not pushed to applications by KDS).

For more information on KDS, please see the following link:

What is data retention period?

Streams created in KDS have a component called “data retention period”. This value indicates the maximum amount of for which time a record can exist in a stream.

Ideally, records are “picked up” by consumers in less time than the specified data retention period. However, if consumers do not get records from the stream in under this time, the data is lost from the stream.

What’s available out-of-the-box?

The data retention period can be specified by the user after they have created the stream. It can take on a value between 24 and 168 hours, with a default value of 24 hours.

Users can view and edit their data retention period by viewing the details of their stream in the AWS Console.

KDS Data Retention Period setting in AWS Console

Alternatively, the AWS CLI can be used to increase stream retention period or decrease stream retention period.

Issue

With the data retention period being a static value, it cannot dynamically adjust to the behaviour of downstream consumers.

Furthermore, if the owner of the stream is not fully aware of the consumption rates of the downstream consumers, they may have to initially estimate the data retention period and manually update the data retention period.

This becomes even more complicated if the stream owner must work with multiple consumer owners in order to determine whether or not the data retention period is set appropriately.

Implications

If the stream owner estimates a data retention period which is too low, they risk data loss until they manually adjust the retention period.

If the stream owner estimates a data retention period which is higher than necessary, they will be paying for resources they aren’t using.

Costs vary by AWS region, but a sample cost calculation for extended data retention period in Ohio region (us-east-2) is described below. Each additional shard hour is $0.02 USD (two cents) per shard. For a single stream with four shards, the formula is as follows:

Formula for calculating monthly data retention period for a stream with four shards

This monthly cost value is shown for varying days of extended data retention period in the table below:

Monthly cost comparison for extended data retention period days

Solution

In order to address the issue and implications mentioned above, our team created a solution to dynamically auto-scale the data retention period of streams in KDS. The CloudWatch alarms and SNS topic were created in a CloudFormation template.

Considerations

Minimum data retention period: The minimum data retention period has a default value of 24 hours. However, we realized that the stream owner may want to set a minimum value for this. We did this by adding an optional tag on which users could specify this value, if desired.

Scaling increment: The increment by which we scaled up and down each time was 12 hours. We decided on this as we determined that it may be enough for healthy slow consumers to catch up while not adding too much cost to the KDS owner.

High alarm threshold: The threshold of the alarm which signals to increase the data retention period is greater than or equal to 50% of the current data retention period. Our team decided on this threshold as it serves as a reasonable indicator that consumers are slow; less than 50% may prematurely increase the data retention period, incurring unnecessary costs. The high alarm is enabled automatically and increases the data retention period until the maximum value (168 hours) is reached.

Low alarm threshold: The threshold of the alarm which signals to decrease the data retention period is 12 hours less than the high alarm threshold for a period of 6 hours. This 12-hour interval between high and low shifts as the thresholds change but maintains a static value. The low alarm is disabled automatically and is enabled upon the first increase in data retention period. If the data retention period is scaled back down to its minimum value (24 hours or other user-indicated value via stream tags), the low alarm is deleted; it is to be activated again upon an increase in data retention period.

Architecture Overview

In order to dynamically scale the KDS data retention period, the following services were used:

  1. CloudWatch Alarms (using CloudWatch metrics)
  2. Simple Notification Service (SNS)
  3. Lambda

Overall

Complete KDS Data Retention Period Autoscaling Architecture

The complete solution combines both the scale up and scale down architectures. The direction of the scaling action depends on the name of the alarm which invokes the Lambda function, as described below.

CloudWatch

Amazon CloudWatch is a monitoring service which collects data as logs, metrics, and events. CloudWatch metrics and events can be used to create CloudWatch alarms, which we used as part of this solution.

Key Metrics

GetRecords.IteratorAgeMilliseconds: KDS CloudWatch metric indicating the duration between now and when the last record of the GetRecords call was written to the stream. If this value is zero (0), consumers are fully caught up with the stream. If greater than zero, this indicates the consumers are lagging behind the stream.

Alarms

The “high” alarm was created for when the data retention period needed to increase (“scale up”). This alarm was triggered when GetRecords.IteratorAgeMilliseconds was greater than or equal to 50% of the current data retention period for a period of 60 seconds. Once this threshold was met, this alarm was triggered. The high alarm was automatically enabled as the stream starts at its minimum data retention period and therefore can initially only increase.

The “low” alarm was created for when the data retention period needed to decrease (“scale down”). This alarm was triggered when GetRecords.IteratorAgeMilliseconds was 12 hours less than the current high alarm threshold. The low alarm was automatically disabled as the stream starts at its minimum data retention period and therefore can initially only increase. Upon the first scale up action, this alarm is enabled. However, if the alarm scales up and then scales down to the minimum data retention period, the low alarm is deleted/disabled until scale up actions occur.

The CloudWatch alarms were used to invoke an SNS topic, which is described in the next section.

SNS

Amazon Simple Notification Service (SNS) is a messaging service with push-based topics which publish to subscribers of various protocols.

The SNS topic created as part of our solution was invoked by the CloudWatch alarms described above. The subscriber to this topic was the Lambda function in which the scale up/scale down actions occurred.

Events from the CloudWatch Alarms with which SNS invoked the Lambda function were in JSON format and contained all of the necessary information for performing scale up/scale down actions, including:

  • SNS topic ARN
  • Alarm name
  • Alarm state (“ALARM”, “INSUFFICIENT DATA”, “OK”)
  • Metric name (“GetRecords.IteratorAgeMilliseconds”)
  • KDS stream name and identifier (“StreamName”)
Sample SNS message for KDS data retention period alarm

The SNS topic created in this solution was used to invoke a Lambda function, described below.

Lambda

AWS Lambda is a serverless compute platform in which users build functions which can be triggered by a variety of sources, and can also access a variety of services.

The Lambda function we created was responsible for performing the automatic scale up and scale down actions based on information provided in the invoking SNS notification.

In order to make our function work, we utilized the Boto3 libraries for Kinesis and CloudWatch. This enabled us to parse the SNS JSON for necessary information, outlined above.

Threshold Calculation

The threshold calculation was performed as follows:

1. Get KDS stream name from SNS JSON and get stream info using this value, including current data retention period.

records = event.get('Records')
trigger = message_json.get('Trigger')
dimensions = trigger.get('Dimensions')
stream_name = dimensions[0].get('value')
stream_info = kinesis.describe_stream(StreamName=stream_name)
stream_description = stream_info.get('StreamDescription')
retention_period = stream_description.get('RetentionPeriodHours')

2. Check stream tags for non-default minimum data retention period. If present, set this value as the minimum data retention period. If not present, set the minimum data retention period as 24 hours.

stream_tags = kinesis.list_tags_for_stream(StreamName=stream_name)
tags = stream_tags.get('Tags')
if any('minimum_retention_period' in tag for tag in tags):
minimum_retention_period = next(tag['minimum_retention_period'] for tag in tags)
else:
minimum_retention_period = kinesis_min_retention_period

3. Get CloudWatch alarm information from SNS JSON. If status is “ALARM”, dynamic scaling must occur. Otherwise, no action is to occur.

4. Get CloudWatch alarm name from SNS JSON to determine if it was a high alarm (scale up to occur) or low alarm (scale down to occur).

Scale-Up

If “low-alarm” is not present in the alarm name, it is a high alarm.

The Lambda function then checks the current data retention period to ensure it is not already at its maximum possible value (168 hours). If it is, the Lambda function returns a message indicating the data retention period is already at its maximum and no action can occur. If the data retention period can increase, scale up action occurs.

1. A new data retention period is set using the old data retention period, plus the scaling increment.

#Use the retention period from the stream description (step 1 of
#Threshold Calculation) and add the scaling increment to it
#The data retention period unit is HOURS
#In this case, the scaling increment is 12 hours
increased_retention_period = retention_period + 12kinesis.increase_stream_retention_period(StreamName=stream_name,
RetentionPeriodHours=increased_retention_period)

2. A new high alarm threshold is set as 50% of the new data retention period.

#The increased high alarm threshold is 50% of the increased data 
#retention period therefore we multiply by 0.5
#The units for GetRecords.IteratorAgeMilliseconds is MILLISECONDS
#We multiply the increased data retention period by 3,600,000
#to convert from hours to milliseconds
increased_high_threshold = increased_retention_period*3600000*0.5
high_alarm = cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ActionsEnabled=True,
AlarmActions=[
sns_topic_arn
],
MetricName=metric_name,
Namespace='AWS/Kinesis',
Statistic='Maximum',
Dimensions=[
{
'Name':name,
'Value':stream_name
}
],
Period=60,
Unit='Milliseconds',
EvaluationPeriods=1,
DatapointsToAlarm=1,
Threshold=increased_high_threshold,
ComparisonOperator='GreaterThanOrEqualToThreshold')

3. A new low alarm threshold is set as equal to the high alarm threshold, minus twelve hours.

#The increased low alarm threshold is 12 hours less than
#the increased data high alarm threshold
#The units for GetRecords.IteratorAgeMilliseconds is MILLISECONDS
#We multiply the 12 hours by 3,600,000 to convert from
# hours to milliseconds
increased_low_threshold = increased_high_threshold - (12*3600000)low_alarm = cloudwatch.put_metric_alarm(
AlarmName=low_alarm_name,
ActionsEnabled=True,
AlarmActions=[
sns_topic_arn
],
MetricName=metric_name,
Namespace='AWS/Kinesis',
Statistic='Average',
Dimensions=[
{
'Name':name,
'Value':stream_name
}
],
Period=60,
Unit='Milliseconds',
EvaluationPeriods=30,
DatapointsToAlarm=30,
Threshold=increased_low_threshold,
ComparisonOperator='LessThanThreshold')

The high alarm threshold is never less than 12 hours, as the absolute minimum data retention period is 24 hours.

After scale up action occurs, both CloudWatch alarms are updated with the new increased threshold values and the alarm states are set to “INSUFFICIENT DATA”. For example, for the high alarm:

cloudwatch.set_alarm_state(
AlarmName=alarm_name,
StateValue='INSUFFICIENT_DATA',
StateReason='Threshold value updated')

If this is the first time scale up is occurring, the low alarm switches from disables to enabled, and is now active.

Scale-Down

If “low-alarm” is present in the alarm name, it is a low alarm.

The Lambda function then performs the scale down action. If this results in the data retention period being set to its minimum (default or user-specified) value, this alarm is deleted/disabled until the next scale up action. The function also returns a message indicating that the minimum data retention period has been reached.

1. A new data retention period is set using the old data retention period, minus the scaling increment.

kinesis.decrease_stream_retention_period(StreamName=stream_name,
RetentionPeriodHours=decreased_retention_period)

2. A new high alarm threshold is set as 50% of the new data retention period.

3. A new low alarm threshold is set as equal to the high alarm threshold, minus twelve hours.

If this results in the data retention period being set to its minimum (default or user-specified) value, this alarm is deleted/disabled until the next scale up action.

After scale down action occurs, both CloudWatch alarms are updated with the new decreased threshold values and the alarm states are set to “INSUFFICIENT DATA”. This change in alarm status ensures that the alarms are not again (unnecessarily) set off with “ALARM” status.

Testing

In order to test our solution, we used a sample SNS notification and created two test events:

  1. High alarm (no “low-alarm” in the alarm name)
  2. Low alarm (“low-alarm” in the alarm name)

We used these test events to scale up and scale down the data retention period, ensuring the autoscaling performed as expected/desired, especially at the maximum and minimum (default and user-specified) data retention periods. We relied heavily on our CloudWatch logs and the logging outputs we had encoded throughout the function.

After each test event was run, the data retention period was checked to ensure the new value reflected a successful test result. This can be done via Console or CLI.

aws kinesis describe-stream --stream-name <kds-stream-name>

If using the CLI, the value of interest in the response is “RetentionPeriodHours”, which is an integer value indicating the current data retention period, in hours, of the stream.

While testing, it is important to note that if the test has just been run, the stream will have an “UPDATING” status as it scales the data retention period. This will last for approximately one minute. During this time, no changes can be made to the stream, and any tests will fail. Once the stream status is back to “ACTIVE”, testing can resume.

Conclusion

In addition to autoscaling, further functionality could be added to this to indicate the health of stream consumers. For example, SNS could be utilized to send a notification to concerned parties if the data retention period reaches a certain value. Users could then alert consumer owners that their consumers are slow, and look into reasons why this could be happening, as well as remediation actions.

Beyond financial and data reliability benefits, this solution helps KDS owners as it alleviates some of the responsibility of coordinating with consumers to manually determine the best data retention period for their system. It also eliminates the need for KDS owners to manually update the data retention period.

While the static data retention period in KDS may be fine for some users, many will encounter the issue around wanting to ensure no data is lost while only paying for what they actually use. This solution enables KDS owners to focus on more pressing aspects of their streams beyond manual updating of the data retention period.

Team Members: Hashir Ahmed, Anne Nasato, Wail Omar, Doorlabh Panjwani, Brandon Stanley

While working on auto-scaling the data retention period, we also developed a KDS shard auto-scaling solution, which can be viewed here.

--

--