Resolving Bottlenecks of Lambda Triggered By Kinesis — Part 1: Data Stream Capacity

Sarah Hamilton
Engineers @ The LEGO Group
10 min readMay 16, 2022
Photo by Kwame Anim on Unsplash

When it comes to scaling Lambda Functions triggered by Kinesis Data Streams it can be tricky to optimise your infrastructure for your given scenario. Limitations of Lambda functions triggered by Kinesis can result in two bottlenecks — the capacity of the data stream and the ability for the Lambda function to process its computations to keep up with the incoming data.

Here, in Part 1, we discuss how to use features of Kinesis to solve the bottleneck that occurs when we have a lack of capacity in our data stream. In Part 2, we’ll tie the architecture together by resolving the bottleneck that occurs when Lambda Functions can’t keep up with the amount of data funnelling through the Data Stream.

The Bottlenecks to Scaling

Without the appropriate monitoring and reactions, the Lambda triggered by Kinesis architecture can throttle, or even incur a data loss, without us being aware of it (until it’s too late). These failures can arise for a variety of reasons. Exceeding quotas for reads and writes to the data stream is common and causes throttling. In addition, if Lambda does not process the data records quickly enough, such that records end up remaining in the stream for longer than the stream’s retention period, data loss will occur!

The two bottlenecks can be summarised as the following:

  • The capacity of the Kinesis Data Stream (the focus of Part 1)
  • The ability for the Lambda function to compute quickly enough to keep up with the data records coming in from Kinesis (the focus of Part 2)
Kinesis Data Streams will throttle when the write or read throughput is exceeded — the first bottleneck

How do I monitor my system to discover my bottleneck?

The key takeaway for this section is to monitor Kinesis and Lambda using CloudWatch alarms. We’ll discuss in more detail, which metrics to watch out for and ensure you are monitoring with ease using CloudWatch alarms.

Kinesis Data Streams

CloudWatch is your best friend for discovering what is going on within your Kinesis data stream.

Basic stream level monitoring is available out of the box for Kinesis and metrics are sent every minute to CloudWatch. Understanding these metrics and being able to react to them is really important when you have a Kinesis stream handling data in production.

In each section of this series, we describe which metrics to monitor and how to react to them. I would strongly recommend that you set up CloudWatch alarms for some of the metrics so that you can be notified and take action when necessary.

If your stream has more than one shard, you should enable enhanced shard-level metrics which will allow you to keep track of hot/cold shards and take appropriate actions to prevent throttling and reduce overall costs. Enhanced shard-level metrics have an additional associated cost, but if you are using the metrics efficiently, monitoring should save you money in the long term.

Monitoring Lambda Functions

Lambda sends metrics about your Lambda function invocations to CloudWatch out of the box. Like Kinesis, you will want to set up CloudWatch alarms so that you will be alerted of any issues that you need to act upon.

Kinesis Shards

The simplest way to increase throughput in your Kinesis data stream is to increase the number of shards available for data to flow through. The quotas for a single shard are as follows:

  • Writes — 1MB/s OR 1000 records/s
  • Reads — 2MB/s OR 5 reads/s with a maximum of 1000 records in the read

For more details on Kinesis quotas and limits click here.

AWS provides two modes for Kinesis Data Streams — on-demand mode and provisioned. On-demand mode allows AWS to do the scaling automatically with resolves some scaling issues. I discuss the pros and cons of using on-demand mode later in the article. For now, we focus on provisioned mode as it is the mode that often requires troubleshooting.

To increase throughput you’ll need to provision more shards within your data stream. This is simple to do but does require some planning and monitoring to determine what action to take.

For an individual shard, the ordering of data records is maintained by a sequence number that AWS apply to the data record. However, ordering is not maintained across shards — so if you need to maintain the ordering of data records, this is going to require the use of well-thought-out partition keys.

Let’s take the example of a frontend funnelling clickstream data through a Kinesis stream. If one shard is enough to handle the amount of clickstream data, then you don’t need to worry about provisioning extra shards or ordering data records.

A Kinesis data stream with one shard will order the data records as they enter the stream

Which metrics to look at?

ReadProvisionedThroughputExceeded — If your consumers exceed your provisioned throughput they will throttle and you will no longer be able to read from the stream. Monitor this average statistic on this metric and try to keep it as close to 0 as possible. If this metric is above 0 you will need to add more shards to avoid data loss and throttling.

WriteProvisionedThroughputExceeded — If your producers are writing more data to the data stream than its provisioned capacity, the Kinesis stream will throttle. Monitor the average statistic for this metric and increase shards when you are being alerted.

⬆️ Increasing the Number of Shards

By increasing the number of shards, you will be increasing the overall capacity of your Kinesis Data Stream proportionally.

However, you need to be smart with how you partition the data that goes to your shards for the following reasons:

  • If you need to maintain the ordering of certain types of events, then you need the same type of event to go through the same shard
  • A similar number of data records should go to each shard to avoid “hot”/“cold” shards

Maintaining Ordering

Remember that ordering is not maintained across shards, only within the individual shard.

If you don’t care about ordering your data records, a good way to partition your data would be on a recordId, such that the data will be “randomly” assigned to a shard and therefore the data will be spread evenly across shards.

If you need to maintain the ordering of certain types of data records (such as data for a particular user, userId) then you’ll need to use the userId as the partition key.

Take three types of clickstream data, A, B & C, from the client. Assuming that all events are to be ordered per type, then the partition key will need to be the event type.

Top: Ineffective partitioning where event type A is split across shards and ordering is not maintained. Bottom: Effective partitioning where the ordering of event type is maintained.

Avoiding hot and cold shards

In an ideal situation, data is evenly distributed across shards to make the most of your shard capacity. A shard is considered to be “hot” when the quotas on a shard are exceeded. On the other hand, a shard that has little data flowing through (such that its capacity isn’t being utilised efficiently), is regarded as “cold”. See the schematic below for a visual representation of hot and cold shards. These shards are considered to be problematic because hot shards will throttle and cold shards are an inefficient use of capacity.

In the case of a poorly chosen partition key, you may get hot and cold shards. The impact of this will be throttling and underutilised shard capacity. Take four types of clickstream data, A, B, C & D, where event types A and C are far more common that event types B and D.

By using appropriate partition keys, data is evenly distributed across each shard, avoiding “hot” and “cold” shards

Wise partitioning will lead to a more even distribution across shards. In the above example, it makes sense for A & B to go into one shard and C & D the other, to ensure even distribution.

Which metrics to look at?

You can monitor your shards by enabling enhanced shard-level metrics which provide details on the data flowing through the individual shards and allows you to identify hot and cold shards.

ReadProvisionedThroughputExceeded — Identify if consumers of the individual shard are exceeding the read limits on the shard

WriteProvisionedThroughputExceeded — Identify if producers that write to the shard are exceeding the ingest limits on the shard

What to do when you identify hot/cold shards?

When the limits of an individual shard are exceeded, the data stream set-up will need adjusting.

In the case of two shards, with one reaching its limits but the other far below capacity, it may be unnecessary to introduce another shard. Firstly, think about the partition key and decide if it can be changed to allow the data to flow more evenly across shards.

If there is no capacity remaining in the other shards, or the partition key cannot be changed, it will be necessary to split the hot shard. Splitting involves dividing one shard into two shards, creating more capacity overall.

Splitting a Hot Shard

Splitting a hot shard is a process that can be carried out through the terminal using the aws-cli. For brevity, I show a simple process of splitting a shard into two, but for further information visit the AWS docs on splitting shards.

After splitting a hot shard, 2 OPEN child shards exist while the parent shard exists in the CLOSED state, identifiable by the presence of an EndingSequenceNumber

Firstly, we need to discover the start and ending hash key of the shard in question.

aws kinesis describe-stream --stream-name MyKinesisStream \
--profile <my-profile> --region <my-region>
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000", // OPEN
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629319735256307061546026074312453541434821520675307522"
}
}
]
}
}

For this example, we want to split the shard into two, with half of the data records flowing through the first stream and half through the second — so we identify the hash key midway between the StartingHashKey and the EndingHashKey of the shard in question.

( 340282366920938463463374607431768211455 - 0 ) / 2 = 170141183460469231731687303715884105727

Using the midway hash key we split the shard.

aws kinesis split-shard --stream-name MyKinesisStream \
--shard-to-split shardId-000000000000 \
--new-starting-hash-key 170141183460469231731687303715884105727 \
--profile <my-profile> --region <my-region>
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000", // CLOSED
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629319735256307061546026074312453541434821520675307522",
"EndingSequenceNumber": "49629319735267457434145291385882012474751635113541369858"
}
},
{
"ShardId": "shardId-000000000001", // OPEN
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105726"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629320275848671419126861648279800188703856897153826834"
}
},
{
"ShardId": "shardId-000000000002", // OPEN
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105727",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629320275870972164325392271421335906976505258659807266"
}
}
]
}
}

Note that the parent shard is now in the closed state, identifiable by the presence of its EndingSequenceNumber. See that two new shards have appeared in the stream with the parent’s hash range split into two and the shards are referencing the parent shard.

Merging Cold Shards

If you identify cold shards, you may consider merging them into one shard to reduce your costs.

Open shards always span the MD5 hash range and only adjacent shards can be merged. For example:

  • shard-1 with hash range 200–450 CAN be merged with shard-2 with hash range 451–700 ✅
  • shard-1 with hash range 200–450 CAN NOT be merged with shard-2 with hash range 785–999 ❌
After merging two cold shards the Parent Shard and Adjacent Parent Shard exist in the CLOSED state and the child shard exists in the OPEN state with the full hash range inherited

In our example above, the shards are adjacent so we can go ahead and merge them.

aws kinesis merge-shards --stream-name MyKinesisStream \
--shard-to-merge shardId-000000000001 \
--adjacent-shard-to-merge shardId-000000000002 \
--profile <my-profile> --region <my-region>
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000", // CLOSED
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629319735256307061546026074312453541434821520675307522",
"EndingSequenceNumber": "49629319735267457434145291385882012474751635113541369858"
}
},
{
"ShardId": "shardId-000000000001", // CLOSED
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105726"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629320275848671419126861648279800188703856897153826834",
"EndingSequenceNumber": "49629320275859821791726126959849359122020662655999541266"
}
},
{
"ShardId": "shardId-000000000002", // CLOSED
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105727",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629320275870972164325392271421335906976505258659807266",
"EndingSequenceNumber": "49629320275882122536924657582990894840293311017505521698"
}
},
{
"ShardId": "shardId-000000000003", // OPEN
"ParentShardId": "shardId-000000000001",
"AdjacentParentShardId": "shardId-000000000002",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49629320775786777279786371235227532424934921414209699890"
}
}
]
}
}

Notice that the two open shards were closed and shardId-000000000003 exists in the open state and references the ParentShardId and AdjacentParentShardId. For more details on merging shards reference the AWS docs.

Autoscaling with CloudWatch and Lambda

If throughput is highly variable and you want to automate your Kinesis Data Stream scaling, you may consider implementing an autoscaling solution to prevent you from needing to frequently add and remove shards manually. I point you to a great article on AWS’s Big Data Blog which shows the architecture used to achieve this.

Kinesis On Demand

In 2021 AWS announced “Kinesis On Demand” where the Kinesis stream scales automatically depending on your needs. This may sound like an awesome solution to the problem described above, but there are a few points to consider before diving into On-Demand.

  • The cost of a stream with one shard in on-demand mode is over 2.5x costlier than a provisioned shard at $0.048 vs. $0.0179 per hour in eu-west-2
  • Chosen partition keys can still lead to writes exceeding a shard’s limits. So you’ll still need to have monitoring and react to this
  • Could be a great option for spiky production workloads where the partition key leads to even distribution.
  • Useful for Kinesis Data Stream users that don’t want to invest the time in handling capacity themselves

You CAN switch between On-Demand and Provisioned use, so you aren’t limited to either, even when your infrastructure is all set up!

Wrapping up!

It’s not always easy to efficiently scale Kinesis Data Streams, as we need to monitor various metrics and diagnose the problem before taking the appropriate action. In Part 1 we’ve discussed the art of scaling Kinesis Data Streams efficiently.

In Part 2, we’ll be looking at the second bottleneck in the Kinesis Data Stream plus Lambda Consumer architecture, where the consumer cannot keep up with the number of data records coming from the Kinesis Data Stream. We’ll discuss batching data records, increasing memory on the Lambda Functions, Parallelisation factors and much more — stay tuned!

--

--

Sarah Hamilton
Engineers @ The LEGO Group

Application Engineer at The LEGO Group and Serverless Enthusiast