Inner workings of Harness’s Cloud Billing Data Ingestion Pipeline for AWS

Nikunj Badjatya
Harness Engineering
4 min readFeb 2, 2021

In this article we will see how we can make a multi tenant system using event driven architecture and ingest the AWS’s CUR reports into your data pipeline. Particularly, we will see how we can ingest the AWS CUR data into a BigQuery Dataset.

Also see Part 1 of this article series where we talked about GCP billing data ingestion.

AWS generates CUR (Cost and Usage Report) atleast once a day and upto three times a day.

This CUR has detailed billing data across AWS accounts to help you analyze your cloud spend.

Pipeline at a high level looks like this:

Data pipeline

Prerequisite 1, you would need to enable CUR in source AWS account.
Once setup, the CUR is delivered in a S3 bucket which we can use for further processing downstream.

Prerequisite 2, create cross account IAM role.
To make the system multi tenant, you would require to configure a cross account IAM role. This can be done using a AWS CloudFormation template. An example for this template can be seen here.

At this point, we have the CUR in the source S3 bucket. We also have the necessary cross account role created for us to sync this CUR from source S3 bucket into destination S3 bucket. The next sequence of steps is to consume these CURs whenever they get any updates.

Step1) Sync data across S3 buckets.

“aws s3 sync” cli supports syncing of s3 buckets from one account to another and across regions. Using the cross account IAM role provisioned, we can copy the CUR from source S3 to Harness’s S3 bucket.

Example:

aws sts assume-role — role-arn <role-arn-source> — role-session-name <session-name> — external-id <role-externalid>

The output of this command will give you access key, secret key and session token. You will need to export these in order to do s3 sync.

export AWS_ACCESS_KEY_ID=<>

export AWS_SECRET_ACCESS_KEY=<>

export AWS_SESSION_TOKEN=<>

Once these are exported, we can do s3 sync.

aws s3 sync s3://<source_cur_folder>, s3://<destination_s3_folder> — source-region <us-east-1> — acl, bucket-owner-full-control

This step needs to be done periodically in order to sync any updates at the source S3 bucket level (heard SpringBoot Batch?).

Having our own S3 bucket in the pipeline allows us to accomplish certain things. It acts as a store to replay data into our pipeline. This also makes the pipeline simpler to setup and to onboard customers.

Step2) Sync data from S3 to GCS bucket.

Harness runs on GCP and we use BigQuery to analyse the cloud spend.

We use GCP’s storage transfer service to sync between Harness’s S3 bucket to GCS bucket.

To accomplish this you would need an access key and secret key for S3 bucket.

At the end of this, we have data in the GCS.

Configuring GCP Storage Transfer via console does not allow adjusting the frequency to hourly. Nor does it allow adding the PubSub topic for transfer completion events. This is however possible to do via client SDKs/RestApis.
Here is a sample python code:

"""
Creates GCP data transfer job from AWS S3 to GCS bucket.
Provide the projectId, description, source_bucket, sink_bucket, AWS S3 access and secret keys below.

$ python gcp_create_transfer_job.py
"""

import datetime
import json
import googleapiclient.discovery

def create():
"""Create a one-time transfer from Amazon S3 to Google Cloud Storage."""
storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1')
project_id = "GCPPROJECTID"
description = "TRANSFERDESCRIPTION"
start_date = datetime.datetime.now()
start_time = datetime.datetime.now() - datetime.timedelta(hours=5, minutes=20)
end_date = start_date + datetime.timedelta(days=2)
secret_access_key = "AWS_S3_SECRET"
access_key_id = "AWS_S3_ACCESS"
source_bucket = "SOURCE_AWS_S3_BUCKET"
sink_bucket = "SINK_GCS_BUCKET"
# Edit this template with desired parameters.
transfer_job = {
'description': description,
'status': 'ENABLED',
'projectId': project_id,
'schedule': {
'scheduleStartDate': {
'day': start_date.day,
'month': start_date.month,
'year': start_date.year
},
'scheduleEndDate': {
'day': end_date.day,
'month': end_date.month,
'year': end_date.year
},
'startTimeOfDay': {
'hours': start_time.hour,
'minutes': start_time.minute,
'seconds': start_time.second
},
'repeatInterval': '3600s'
},
'transferSpec': {
'awsS3DataSource': {
'bucketName': source_bucket,
'awsAccessKey': {
'accessKeyId': access_key_id,
'secretAccessKey': secret_access_key
}
},
'gcsDataSink': {
'bucketName': sink_bucket
},
"objectConditions": {
"maxTimeElapsedSinceLastModification": "3600s"
},
"transferOptions": {
"overwriteObjectsAlreadyExistingInSink": True
}
}
}

result = storagetransfer.transferJobs().create(body=transfer_job).execute()
print('Returned transferJob: {}'.format(
json.dumps(result, indent=4)))


create()

Step3) Downstream processing

This is where we perform data processing using GCP PubSub, CloudScheduler and CloudFunctions.

A CloudFunction listens to the GCS bucket for any object create event.
AWS CUR also provides a manifest json file which basically has the schema definition for the CSVs. When the manifest file is ingested into the GCS bucket, a CloudFunction gets triggered which then creates the dataset and tables in BigQuery.

Once you have the relevant tables created, depending on the needs, you can incorporate GCP’s CloudScheduler to schedule an event for various kinds of data processing on top of the CSVs.

Data processing pipeline

That is how we ingest the AWS cloud spend data.

And yes, we use Terraform to manage our cloud resources — in this case PubSub, CloudFunction and StackDriver monitoring. Don’t forget to set appropriate monitoring for your cloud resources!

Using PubSub and CloudFunction made the pipeline even better as these services are scalable and performant.

That’s it for today. Please leave comments or questions below. Thank you for reading.

In Part3 of this article series, we will see for AZURE cloud spend data pipeline.

--

--