DataOps: Cost-effective, automated big data pipeline using AWS

Jinam Shah
Cactus Tech Blog
Published in
6 min readApr 14, 2021
(Image by author)

Introduction

In this day and age, storage is cheap whereas compute is expensive. Hence, traditional map-reduce clusters that are kept “on” perpetually will rack up enormous costs especially if the map-reduce process is triggered sporadically. This article explores the automation of a big data processing pipeline while maintaining low cost and enabling alerts. This is achieved using various AWS services like AWS Elastic MapReduce (EMR), AWS StepFunctions, AWS EventBridge, AWS Lambda.

For example, if a data processing job takes an hour to run and is triggered once every day on a cluster that is always on, then ~96% of the compute capacity, and by implication, money is wasted.

The image shows when the cluster is idle. This metric will also be used to create AWS CloudWatch alarms.

For the purpose of this article, consider that real-time data is continuously getting added to an S3 prefix. In most use cases, a data processing job would not be run every second/minute. Ideally, the data would be batched and a data processing job would be run at regular intervals. This is achieved using AWS EventBridge which triggers an AWS StepFunction as shown in the image below

Abstract-level flow diagram

Assumptions:

  • The data files are delivered to s3://<BUCKET>/raw/
  • The data files mentioned above are incremental updates that are deleted once processed.

The proposed architecture

The AWS EventBridge event triggers an AWS Step Function that creates an AWS EMR cluster that will run the data processing job and also creates AWS CloudWatch alarms that are responsible to send messages to an AWS SNS topic (which can send emails if required). The step function is also responsible for cleanup i.e. removing the alarms once the job is complete and killing the EMR cluster.

please refer to https://github.com/jinamshah/aws-data-ops-article-1 to get the boilerplate code for the solution.

Note: to follow along with the commands, please install AWS CLI and run the commands in the main folder.

StepFunction definition

1. StepFunction

First, define the AWS StepFunction that orchestrates the whole thing.

Create an IAM role for the StepFunction

> aws iam create-role --role-name stepfn-role --assume-role-policy-document file://stepfunction/stepfunction_policy.json> aws iam attach-role-policy --role-name stepfn-role  --policy-arn arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess

Create the StepFunction

> SF_ROLE_ARN=`aws iam get-role --role-name stepfn-role | grep Arn | sed -E 's/\s+"Arn"://' | sed 's/,//' | sed 's/"//g' | sed -E 's/\s++//'`;> SF_DEFINITION=$(<stepfunction/stepfunction_definition.json);> STEPFUNCTION_ARN = `aws stepfunctions create-state-machine --name StepFunctionTest --definition $SMDEFINITION --role-arn $SM_ROLE_ARN | grep Arn | sed -E 's/\s+"Arn"://' | sed 's/,//' | sed 's/"//g' | sed -E 's/\s++//'`;echo $STEPFUNCTION_ARN

2. Lambda function

The first step in the step function is to trigger an AWS Lambda that determines the storage space required based on the size of the data that is added to the S3 bucket. This is another step towards reducing cost.

Create IAM role for lambda and attached required policies:

> aws iam create-role --role-name lambda-role --assume-role-policy-document file://lambda/lambda_policy.json> aws iam attach-role-policy --role-name lambda-role  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole> aws iam attach-role-policy --role-name lambda-role  --policy-arn arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess> aws iam attach-role-policy --role-name lambda-role  --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

Create the lambda function

> ROLE_ARN=`aws iam get-role --role-name lambda-role | grep Arn | sed -E 's/\s+"Arn"://' | sed 's/,//' | sed 's/"//g' | sed -E 's/\s++//'`> zip lambda_code.zip lambda/lambda_function.py> aws lambda create-function --function-name lambda-determine-storage --runtime python3.8 --role $ROLE_ARN --handler lambda_function.lambda_handler --timeout 60 --zip-file fileb://lambda_code.zip | grep FunctionArn | sed -e 's/"//g' -e 's/,//g'  -e 's/FunctionArn//g' -e 's/: //g'

3. EMR Cluster

This is an explanation about the choices made for the EMR cluster. The EMR cluster was already defined in the first step and will be created when the StepFunction is triggered.

As mentioned in the above documentation:

The node types in Amazon EMR are as follows:

Master node: A node that manages the cluster by running software components to coordinate the distribution of data and tasks among other nodes for processing. The master node tracks the status of tasks and monitors the health of the cluster. Every cluster has a master node, and it’s possible to create a single-node cluster with only the master node.

Core node: A node with software components that run tasks and store data in the Hadoop Distributed File System (HDFS) on your cluster. Multi-node clusters have at least one core node.

Task node: A node with software components that only runs tasks and does not store data in HDFS. Task nodes are optional.

For the purpose of this article, let's assume that the data processing job requires 6 r5.12xlarge CORE nodes and one m5.xlarge MASTER node (can be tweaked according to the use case). Another consideration for using r5.12xlarge instances is that at the time of writing this article, the spot interruption rate of these instances is less than 5%, hence we can use spot instances to reduce cost by ~70% with good confidence (although, it is not advised for sustained production load).

In case you wish to use on-demand instances instead of spot instances, please change the second Instance Fleet (that mentions config for 'CORE' InstanceFleet) in stepfunction/stepfunction_definition.json

{		
"Name": "CORE",
"TargetOnDemandCapacity": 6,
"InstanceFleetType": "CORE",
"InstanceTypeConfigs": [
{
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB.$": "$.lambda_op.ebs_size",
"VolumeType": "gp2"
},
"VolumesPerInstance": 1
}
],
"EbsOptimized": true
},
"InstanceType": "r5.12xlarge"
}
]
}

One important step for any EMR job is the bootstrapping step. This step is used to add any additional drivers, software, or any other instance customization needed. To demonstrate this, I will clone the codebase using this step. The code for it can be found here.

4. MapReduce step and Cloudwatch Alarms

The MapReduce is a simple Python code that creates a spark app and then kills itself. This should be enough to test if the step was triggered correctly and if the spark was instantiated correctly.

To use PySpark, it will have to installed in all the EC2 instances in the cluster. This can be achieved in two ways, either by adding a pip install command in the bootstrap step in the EMR cluster definition or an Amazon Machine Image (AMI) can be created with PySpark already installed.

Since there is no direct method to run Python code in the EMR cluster, we run it using a bash script. Bash scripts can be run on EMR clusters using a JAR file provided by AWS here (s3://regionname.elasticmapreduce/libs/script-runner/script-runner.jar ).

The Python code for the MapReduce step is can be found here and the bash wrapper for it is can be found here.

The AWS CloudWatch alarms are created to check if the EMR cluster is idle or if any apps have failed. Hence, the MapReduce step is wrapped by the creation and deletion of these alarms. This is also achieved using the above-mentioned method of using a bash script.

Create the EventBridge Rule (triggered every day at 6 am)

aws events put-rule --name pipeline-trigger \
--schedule-expression "cron(0 6 * * *)" \
--state ENABLED

Add StepFunction as the trigger for the EventBridge rule

aws events put-targets --rule pipeline-trigger \
--targets "Id"="1","Arn"=$STEPFUNCTION_ARN,"RoleArn"=$SF_ROLE_ARN

Future Work

These are some improvements that can be made in the future:

  • Add a lambda function that can send updates to slack/any other service to get updates about the steps being executed.
  • Add a database to maintain process information for future debugging, and so that the raw data doesn’t have to be deleted once processed. Instead, a difference based on process information can be used.

--

--

Jinam Shah
Cactus Tech Blog

Graduate Computer Science student at NCSU who is passionate about AWS and Data science. | jinamshah.com