Athena with kinesis data firehose with CDK

Abhishant Prakash
3 min readJul 13, 2024

--

Introduction

In the last part of the series we looked into ingesting data from kinesis data streams to firehose and using the dynamic partitioning and records de-aggregation feature to write our data to S3 bucket. If you haven't looked at the part, I do recommend giving it a read.

This was the architecture we created in our previous series:

Firehose to s3

In this part of the series we will be looking at querying this data stored in s3 using athena and will be creating the necessary infrastructure using cdk.

Why Athena ?

Athena is serverless service from AWS that allows you to query data from many services like RDS, Dynamodb, S3, Redshift etc.

The real benefit of athena is that you pay for the amount of data that you scan with athena and enable caching as well.

Structure of our data in S3

After processing the data from the firehose the data in the bucket would looks something like below :

S3 data structure

To query this data in athena, we can create catalog of the data in glue catalog. For this purpose we will be using glue crawlers that can crawl the data, glue crawler will understand the data schema, partitions and will create a table for you in your specified database.
We will be creating incremental crawler that will scan the new partitions only.

Let's start creating the resources using CDK.

  1. Create the glue database
database_name = "firehose_test"
### create the database first
database = glue.CfnDatabase(
self,
"glue_database",
catalog_id=Aws.ACCOUNT_ID,
database_input=glue.CfnDatabase.DatabaseInputProperty(name=database_name),
)

2. Create the glue role that

glue_role = iam.Role(
self,
"glue-test-role-firehose",
role_name="glue-test-firehose-role",
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSGlueServiceRole"
),
],
)

3. Grant the role ability to read the bucket where data is stored. Note here lake_bucket is referencing my delivery bucket from previous series through cross stack reference.

lake_bucket.grant_read(glue_role)

4. Create the glue crawler

crawler = glue.CfnCrawler(
self,
"testcrawler",
role=glue_role.role_arn,
targets=glue.CfnCrawler.TargetsProperty(
s3_targets=[
glue.CfnCrawler.S3TargetProperty(
path=f"s3://{lake_bucket.bucket_name}/data"
)
]
),
database_name=database_name,
name="firehose-output-test",
recrawl_policy=glue.CfnCrawler.RecrawlPolicyProperty(
recrawl_behavior="CRAWL_NEW_FOLDERS_ONLY"
),
configuration=json.dumps(
{
"Version": 1.0,
"CrawlerOutput": {
"Partitions": {"AddOrUpdateBehavior": "InheritFromTable"},
"Tables": {"AddOrUpdateBehavior": "MergeNewColumns"},
},
"Grouping": {"TableGroupingPolicy": "CombineCompatibleSchemas"},
}
),
)

5. Deploy the stack and run the crawler. Go to athena and you should be able to query your data

Query data using Athena.

Thoughts

We have created the crawler that need to run manually, but you can either create schedule based crawler or invoke them using Event bridge rules when new data arrives in the s3 bucket. That's for some other time. For the time being, peace out !!

--

--