AWS Kinesis Data Firehose with CDK — Dynamic Partitioning

Abhishant Prakash
5 min readJul 13, 2024

--

Introduction

Certain things have made me curious about the ever changing landscape in the data world. While batch processing of big data is still a hot topic of discussion in the tech community with spark, cuda and other big data frameworks, I believe that the real time or near real time streaming of data would be the future of majority of the data processing.

We have already looked at the real time processing of the data through IaC in our kines data stream series, in this part of the series we will be looking at how to create the architecture and pattern for near real time using Kinesis Firehose. We will be utilizing the existing kinesis data stream as the source of producer for our firehose delivery stream and look at the end to end architecture with CDK with minimal/no console involved. If you haven’t looked at the kinesis creation and pushing data into it, please do check out that part of the series.

We will also be looking at pattern of creating dynamic partitioning of the data through firehose that will allow effective and efficient queries being performed at the data in Athena.

Kinesis Data Firehose

Let's look at the architecture that we will be building in this article.

AWS kinesis firehose delivery stream.

Kinesis Data firehose supports the following sources(this may evolve going forward) at the time of writing this article:

  1. Kinesis Data Streams
  2. Manage Streaming for Kafka
  3. Direct Put using API

One of the main advantage of using firehose apart from being a managed service is that, it allows you to buffer the data based on certain properties that you can control. For example, you can configure the buffering size of the data and firehose will only trigger data being written to destination unless the buffer size is met from the source. Also you can configure the time interval duration that firehose will follow to write the data at its specified destination.

These are some of the properties that make the firehose a good near real time service for data ingestion pattern.

Let's get started with the interesting part of coding.

Photo by Catherine Heath on Unsplash

Coding

  1. Lets import some of the necessary cdk constructs that we will be using in our code.
from aws_cdk import (
# Duration,
Stack,
# aws_sqs as sqs,
aws_kinesis as kinesis,
aws_lambda,
Duration,
RemovalPolicy,
aws_lambda_event_sources,
aws_sqs as sqs,
aws_s3 as s3,
Aws,
aws_kinesisfirehose as kinesis_firehose,
aws_iam as iam,
)
from constructs import Construct

2. Let's create the s3 bucket first that we will be using to store the data from firehose delivery stream

### create the bucket first
self.delievery_bucket = s3.Bucket(
self,
"firehose_bucket",
bucket_name=f"aws-firehosebucket-{Aws.ACCOUNT_ID}",
access_control=s3.BucketAccessControl.BUCKET_OWNER_FULL_CONTROL,
encryption=s3.BucketEncryption.S3_MANAGED,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
versioned=True,
)
self.delievery_bucket.apply_removal_policy(RemovalPolicy.DESTROY)

3. Create an IAM role that firehose service will be assuming to process and write the data to s3 bucket

firehose_role = iam.Role(
self,
"firehose_delivery_role",
assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"AmazonKinesisReadOnlyAccess"
),
],
)

4. Granting this role the access to read from kinesis stream and granting read and write to the destination s3 bucket

## allow firehose role to read from data streams
kinesis_stream.grant_read(firehose_role)
## allow the role to read and write to s3 bucket
self.delievery_bucket.grant_read_write(firehose_role)

5. Let's look at creating the firehose stream and we will explain each portion of the configuration in details

firehose = kinesis_firehose.CfnDeliveryStream(
self,
"firehose_stream",
delivery_stream_name="firehose_development_stream",
delivery_stream_type="KinesisStreamAsSource",
kinesis_stream_source_configuration=kinesis_firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=kinesis_stream.stream_arn,
role_arn=firehose_role.role_arn,
),
extended_s3_destination_configuration=kinesis_firehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
bucket_arn=self.delievery_bucket.bucket_arn,
role_arn=firehose_role.role_arn,
prefix="data/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/original_id=!{partitionKeyFromQuery:id}/",
error_output_prefix="errors/error_type=!{firehose:error-output-type}/",
### 64 minimum size is required when dynamic partitioning is enabled
buffering_hints=kinesis_firehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=100, size_in_m_bs=64
),
### here we can give the Inline parsing for JSON as well
processing_configuration=kinesis_firehose.CfnDeliveryStream.ProcessingConfigurationProperty(
enabled=True,
processors=[
kinesis_firehose.CfnDeliveryStream.ProcessorProperty(
type="AppendDelimiterToRecord"
),
kinesis_firehose.CfnDeliveryStream.ProcessorProperty(
type="MetadataExtraction",
parameters=[
kinesis_firehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="JsonParsingEngine",
parameter_value="JQ-1.6",
),
## get year, month, day, id
kinesis_firehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="MetadataExtractionQuery",
parameter_value="{year:.timestamp[0:4],month:.timestamp[5:7],day:.timestamp[8:10],id:.id}",
),
],
),
],
),
dynamic_partitioning_configuration=kinesis_firehose.CfnDeliveryStream.DynamicPartitioningConfigurationProperty(
enabled=True,
#### Total amount of seconds Firehose spends on retries ###
retry_options=kinesis_firehose.CfnDeliveryStream.RetryOptionsProperty(
duration_in_seconds=300
),
),
cloud_watch_logging_options=kinesis_firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
enabled=True,
log_group_name="test_firehose_group",
log_stream_name="test_firehose_stream",
),
),
)

I understand the final portion was way too long, lets break down each part into more details along with the firehose console help.

Break Down

  1. Kinesis data streams source configuration
delivery_stream_name="firehose_development_stream",
delivery_stream_type="KinesisStreamAsSource",
kinesis_stream_source_configuration=kinesis_firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=kinesis_stream.stream_arn,
role_arn=firehose_role.role_arn,
)

delivery_stream_type indicates what is the source type for your firehose delivery stream, delivery_stream_name indicates what your firehose delivery stream name, kinesis_stream_source_configuration allows you to pass the role that this stream can assume and also your kinesis data stream ARN.

2. Let's look at the dynamic partition configuration first.

dynamic_partitioning_configuration=kinesis_firehose.CfnDeliveryStream.DynamicPartitioningConfigurationProperty(
enabled=True,
#### Total amount of seconds Firehose spends on retries ###
retry_options=kinesis_firehose.CfnDeliveryStream.RetryOptionsProperty(
duration_in_seconds=300
),
),
Enabling dynamic partitioning
Retry duration

This particular part of the code enables the dynamic partitions and gives the retry duration property a value. Below is screen shot from the cnsole of what it achieves

3. Let's look into the processing configuration

processing_configuration=kinesis_firehose.CfnDeliveryStream.ProcessingConfigurationProperty(
enabled=True,
processors=[
kinesis_firehose.CfnDeliveryStream.ProcessorProperty(
type="AppendDelimiterToRecord"
),
kinesis_firehose.CfnDeliveryStream.ProcessorProperty(
type="MetadataExtraction",
parameters=[
kinesis_firehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="JsonParsingEngine",
parameter_value="JQ-1.6",
),
## get year, month, day, id
kinesis_firehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="MetadataExtractionQuery",
parameter_value="{year:.timestamp[0:4],month:.timestamp[5:7],day:.timestamp[8:10],id:.id}",
),
],
),
],
),

Here we are giving different processing properties, AppendDelimiterToRecord is used to separate records so that while querying the data different records are segregated.

Delimiter to records

MetadataExtraction allows you to extract the keys from your records.

Records Deaggregation

For records deaggregation, firehose uses JQ expressions. So if you look into this particular part of the code, “timestamp” and “id” are keys in my records from kinesis data streams and we are mapping it to certain keys like year, month, day and id. These keys will make sense in the next explanation

kinesis_firehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="MetadataExtractionQuery",
parameter_value="{year:.timestamp[0:4],month:.timestamp[5:7],day:.timestamp[8:10],id:.id}",
)

4. Writing the de-aggregated records in partitioned manner to s3 bucket

bucket_arn=self.delievery_bucket.bucket_arn,
role_arn=firehose_role.role_arn,
prefix="data/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/original_id=!{partitionKeyFromQuery:id}/",
error_output_prefix="errors/error_type=!{firehose:error-output-type}/",
### 64 size is required when dynamic partitioning is enabled
buffering_hints=kinesis_firehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=100, size_in_m_bs=64
),

Look at the prefix where we are loading the data, it has the ids we have extracted from our records. This way we are organising the data in s3 in certain partitioned manner.
Also look at the error output prefix where we are incorporating the error type as well to know which error_type caused records ingestion failure.

5. The other thing we configured was the cloudwatch log configuration.

cloud_watch_logging_options=kinesis_firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
enabled=True,
log_group_name="test_firehose_group",
log_stream_name="test_firehose_stream",
),

Conclusion

We looked at creating firehose delivery stream with the dynamic partitioning targeting data to s3 and using cdk with python to create the architecture. In next part we will look at querying this data with Athena.

--

--