AWS Kinesis Data Streams with CDK Python — Part 1

Abhishant Prakash
6 min readJun 17, 2024

--

Introduction

This is a part of the learning series where we will be exploring about real time data ingestion. With the data being generated, consumed and utilised in multiple ways, this is my attempt to look into some of the patterns of data ingestion, services orchestration and different services on AWS. This is my attempt to do all the things using IAC(Infrastructure as Code) and make little or no use of the AWS console.

Streaming data has become the industry norm where data needs to be ingested in real time or near real time so that the value from the data can be harnessed almost immediately. Of course, batch ingestion process has its own set of advantages but today we will be looking at how we can create, produce and consume data from Kinesis data streams using lambda. We will also look at the ways to create enhance fanout pattern as well with consumers like lambda in the next series. So let's get started.

Kinesis Data Streams

Kinesis architecture for simplicity

At the very core of kinesis data streams is the concept of using shards to process data with high velocity. Shards are basic fundamental units of data processing in kinesis data stream world. As can be seen from the above image, kinesis supports multiple sources that can be a source of producer. Some of the examples of those services are: kinesis SDK, Kinesis Producer Library, Kinesis Agents, Spark, Flume, Kafka connect, Apache Nifi.

Kinesis data streams have a limitation of processing 1K messages per second or 1MB/sec per shard of data. Crossing that limit can cause throttling for your stream and producers.

Also, it's worth noting that on the consumption side per shard has shared capacity of 2MB/sec. If you are using enhance fanout pattern(this in my understanding was a game changer), you get 2MB/sec of data processing per shard per consumer.

Simple architecture flow

This is the simple thing we will be building first. It is assumed that you are aware about cdk app creation. Let's get into the coding portion now which is the fun thing.

Simple workflow of what we will be building

Code

Lets import the necessary libraries first

from aws_cdk import (
Stack,
aws_kinesis as kinesis,
aws_lambda,
Duration,
RemovalPolicy,
aws_lambda_event_sources,
aws_sqs as sqs,
aws_iam as iam,
)
from constructs import Construct

Now we will create our data stream

### create the data stream
self.data_stream = kinesis.Stream(
self,
"data_stream",
stream_name="development_stream",
shard_count=3,
stream_mode=kinesis.StreamMode.PROVISIONED,
retention_period=Duration.hours(24),
)
self.data_stream.apply_removal_policy(RemovalPolicy.DESTROY)

Kinesis data streams can be created in two modes:

  • On-demand: useful when you are not aware about the data flow and its unpredictable nature. This mode will auto-scale your capacity and can remove a lot of maintenance for you. But this also means that you probably don't control much of the auto scaling policies, scaling of resources on your own and cost can get out of hand pretty quickly.
  • Provisioned mode: Use this mode when you are aware about your throughputs requirements. In our case we are creating a stream with just three shards.

Take note of retention period of data within kinesis, By default its is twenty four hours and can go upto 365 days. Data inserted into Kinesis is immutable and thus cannot be changed. With this approach we will have twenty four hours to replay the data after it is inserted.

Producer code using lambda

We will be creating a lambda function using boto3 sdk to insert record into data streams. We will be using put record(one records at a time) to insert the data into streams.

### create the lambda function for putting records into kinesis data streams
kinesis_lambda_writer = aws_lambda.Function(
self,
"lamda-kinesis-writer",
function_name="kinesis_writer_test",
runtime=aws_lambda.Runtime.PYTHON_3_9,
code=aws_lambda.Code.from_asset("./kinesis_lambda"),
handler="kinesis_lambda_writer.handler",
timeout=Duration.minutes(5),
environment={"stream_name": self.data_stream.stream_name},
)

### grant data stream write permission to lambda
self.data_stream.grant_write(kinesis_lambda_writer)

As can be seen we are also granting lambda to be able to write to data streams at the end as well.

This is how the lambda function looks like

import os
import boto3
import json
from datetime import datetime
import time
import random
import uuid


def handler(event, context):
stream_name = os.environ["stream_name"]
kinesis = boto3.client("kinesis", region_name="ap-southeast-2")

### now put the records into the stream
for index in range(20):
partition_key = index % 3
value = random.randint(10, 100)
data_timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
payload = {"value": value, "id": partition_key, "timestamp": data_timestamp}

put_response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(payload),
PartitionKey=str(uuid.uuid4())[0:8],
)
time.sleep(5)

As can be seen we are fabricating some data in a loop and putting the records using put record method.

It is important to look at the partition key very carefully. When we put the data to Kinesis, we provide a partition key. How kinesis handles those is it creates MD5 hash of the partition key and assigns a range of hashes of those partition keys to a shard. So, an effective and non throttling data streams depends on effectiveness of the distribution of md5 hash of the partition key. In case it is not, there could be issues of hot shards in your stream.

Simple consumer code using lambda

Let's define our consumer lambda function

## lambda reader
kinesis_lambda_reader = aws_lambda.Function(
self,
"lamda-kinesis-reader",
function_name="kinesis_reader_test",
runtime=aws_lambda.Runtime.PYTHON_3_9,
code=aws_lambda.Code.from_asset("./kinesis_lambda"),
handler="kinesis_lambda_reader.handler",
timeout=Duration.minutes(5),
environment={"stream_name": self.data_stream.stream_name},
)

This is how the lambda function looks like(nothing fancy)

import os
import time
import random
import base64


def handler(event, context):
for record in event["Records"]:
data = base64.b64decode(record["kinesis"]["data"])
print("Data record is ")
print(data)

It is important to configure a service that can log records which failed to be processed by lambda(if lambda fails). We use a mechanism of dead letter queue,that can log failures and can be utilised later to ingest the data later on.

Lets see how we can provision dead letter queue

######### create the sqs queue ##########
failure_notifier_queue = sqs.Queue(
self,
"lambda_kinesis_failure",
visibility_timeout=Duration.seconds(300),
queue_name="lambda_kinesis_failure_test",
)
failure_notifier_queue.apply_removal_policy(RemovalPolicy.DESTROY)

Once this is all done, we can create an event source mapping of lambda. This mapping is nothing but allows lambda to poll records from stream using HTTP protocol.

## add event source for lambda to kinesis
kinesis_lambda_reader.add_event_source(
aws_lambda_event_sources.KinesisEventSource(
self.data_stream,
batch_size=10,
starting_position=aws_lambda.StartingPosition.TRIM_HORIZON,
retry_attempts=0,
### important as this can process multiple batch processing in parallel
parallelization_factor=1,
on_failure=aws_lambda_event_sources.SqsDlq(failure_notifier_queue),
)
)

self.data_stream.grant_read(kinesis_lambda_reader)

It is important here to understand how lambda process data from shards. With event source mapping, lambda will share read throughput with other consumers. Here we are setting the batch size of 10 to send 10 records at a time. You can also setup batch window using “max_batching_window” or based on the age of the record as well using “max_records_age” More on this can be found on aws documentation here.

Deploy and test

It's time to deploy our cdk app and test out the things. Run the producer lambda function. To see if data is coming in or not you can look into the data streams console.

You can also see your lambda cloudwatch logs and data will be available there too.

Deliberately failing lambda consumer and seeing message in sqs

I deliberately added a failure condition in my lambda function to see what data comes up in sqs. This is what came up.

{"requestContext":{"requestId":"979f091c-074b-4923-88c1-bc1223cb3b12",
"functionArn":"arn:aws:lambda:ap-southeast-2:{my-aws-account-id}:function:kinesis_reader_test",
"condition":"RetryAttemptsExhausted","approximateInvokeCount":1},
"responseContext":{"statusCode":200,"executedVersion":"$LATEST","functionError":"Unhandled"},"version":"1.0",
"timestamp":"2024-06-15T02:32:59.278Z","KinesisBatchInfo":
{"shardId":"shardId-000000000002",
"startSequenceNumber":"49653008501002934958011330402984580562791687484050767906",
"endSequenceNumber":"49653008501002934958011330402984580562791687484050767906",
"approximateArrivalOfFirstRecord":"2024-06-15T02:32:58.814Z",
"approximateArrivalOfLastRecord":"2024-06-15T02:32:58.814Z",
"batchSize":1,
"streamArn":"arn:aws:kinesis:ap-southeast-2:{my-aws-account-id}:stream/development_stream"}}

As can be seen this does not contain the actual message/data but relevant information like start and end sequence number that can be used in your retry processing mechanism. But beware, you would have to process the failed records before they get deleted from kinesis stream.

Conclusion

We explored fundamentals of data streams along with IAC using CDK. Understanding the development especially infrastructure as code is something in my understanding gives a lot more depth and knowledge to any developer. We explored data streams data ingestion pattern in this part and will be exploring other fun and knowledgeable things in other part of the series as well. Until then keep learning.

--

--