Amazon Kinesis Data Stream to ingest real-time data from AWS Cost Explorer

Awadhesh Gupta
2 min readMar 21, 2024

--

1. Amazon Kinesis Data Stream Setup:

First, let’s set up an Amazon Kinesis Data Stream using the AWS Management Console:

  1. Go to the Amazon Kinesis console: Amazon Kinesis Console
  2. Click on “Create data stream”.
  3. Provide a name for your stream and choose the number of shards. Shards control the ingestion and processing capacity of your stream.
  4. Click “Create data stream”.

2. AWS Lambda Function Setup:

Now, let’s create an AWS Lambda function to process the data from the Kinesis stream:

  1. Go to the AWS Lambda console: AWS Lambda Console
  2. Click on “Create function”.
  3. Choose “Author from scratch”.
  4. Provide a name for your function, select Python as the runtime, and choose an execution role with permissions to access Amazon Kinesis and AWS Cost Explorer.
  5. Click “Create function”.

Now, let’s write the Lambda function code:

import json
import boto3

def lambda_handler(event, context):
kinesis_client = boto3.client('kinesis')
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
print("Received data:", data)
# Here you can process the data as per your requirements

return {
'statusCode': 200,
'body': json.dumps('Data processed successfully!')
}

3. Connect Kinesis Data Stream to Lambda:

Next, let’s connect the Kinesis Data Stream to the Lambda function:

  1. Go to the AWS Lambda console.
  2. Click on your Lambda function.
  3. Under “Designer”, click on “Add trigger”.
  4. Select “Kinesis” as the trigger type.
  5. Choose the Kinesis Data Stream you created earlier.
  6. Configure the batch size and starting position as per your requirements.
  7. Click “Add”.

4. Fetching Data from AWS Cost Explorer and Putting into Kinesis Stream:

Finally, you need to fetch data from AWS Cost Explorer and put it into the Kinesis Data Stream. You can achieve this using an AWS Lambda function triggered by a CloudWatch Events rule or through a scheduled Lambda function.

Here’s a sample code to fetch data from AWS Cost Explorer and put it into the Kinesis Data Stream:

import boto3
import json

def lambda_handler(event, context):
kinesis_client = boto3.client('kinesis')
ce_client = boto3.client('ce')

response = ce_client.get_cost_and_usage(
TimePeriod={
'Start': '2024-03-01',
'End': '2024-03-21'
},
Granularity='DAILY',
Metrics=['BlendedCost'],
GroupBy=[
{
'Type': 'DIMENSION',
'Key': 'SERVICE'
}
]
)

data = json.dumps(response['ResultsByTime'])

response = kinesis_client.put_record(
StreamName='your-kinesis-stream-name',
Data=data,
PartitionKey='partitionkey'
)

print("Data put into Kinesis Stream:", response)

return {
'statusCode': 200,
'body': json.dumps('Data put into Kinesis Stream successfully!')
}

Remember to replace 'your-kinesis-stream-name' with the name of your Kinesis Data Stream.

With this setup, data from AWS Cost Explorer will be fetched periodically and put into the Kinesis Data Stream. The Lambda function connected to the stream will process the incoming data in real-time.

--

--