DataOps: Cost-effective, automated big data pipeline using AWS

Jinam Shah
Apr 14 · 6 min read
(Image by author)

Introduction

In this day and age, storage is cheap whereas compute is expensive. Hence, traditional map-reduce clusters that are kept “on” perpetually will rack up enormous costs especially if the map-reduce process is triggered sporadically. This article explores the automation of a big data processing pipeline while maintaining low cost and enabling alerts. This is achieved using various AWS services like AWS Elastic MapReduce (EMR), AWS StepFunctions, AWS EventBridge, AWS Lambda.

For example, if a data processing job takes an hour to run and is triggered once every day on a cluster that is always on, then ~96% of the compute capacity, and by implication, money is wasted.

The image shows when the cluster is idle. This metric will also be used to create AWS CloudWatch alarms.

For the purpose of this article, consider that real-time data is continuously getting added to an S3 prefix. In most use cases, a data processing job would not be run every second/minute. Ideally, the data would be batched and a data processing job would be run at regular intervals. This is achieved using which triggers an as shown in the image below

Abstract-level flow diagram

Assumptions:

  • The data files are delivered to s3://<BUCKET>/raw/
  • The data files mentioned above are incremental updates that are deleted once processed.

The proposed architecture

The AWS EventBridge event triggers an AWS Step Function that creates an AWS EMR cluster that will run the data processing job and also creates AWS CloudWatch alarms that are responsible to send messages to an topic (which can send emails if required). The step function is also responsible for cleanup i.e. removing the alarms once the job is complete and killing the EMR cluster.

please refer to to get the boilerplate code for the solution.

Note: to follow along with the commands, please install AWS CLI and run the commands in the main folder.

StepFunction definition

1. StepFunction

First, define the AWS StepFunction that orchestrates the whole thing.

Create an IAM role for the StepFunction

> aws iam create-role --role-name stepfn-role --assume-role-policy-document file://stepfunction/stepfunction_policy.json> aws iam attach-role-policy --role-name stepfn-role  --policy-arn arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess

Create the StepFunction

> SF_ROLE_ARN=`aws iam get-role --role-name stepfn-role | grep Arn | sed -E 's/\s+"Arn"://' | sed 's/,//' | sed 's/"//g' | sed -E 's/\s++//'`;> SF_DEFINITION=$(<stepfunction/stepfunction_definition.json);> STEPFUNCTION_ARN = `aws stepfunctions create-state-machine --name StepFunctionTest --definition $SMDEFINITION --role-arn $SM_ROLE_ARN | grep Arn | sed -E 's/\s+"Arn"://' | sed 's/,//' | sed 's/"//g' | sed -E 's/\s++//'`;echo $STEPFUNCTION_ARN

2. Lambda function

The first step in the step function is to trigger an AWS Lambda that determines the storage space required based on the size of the data that is added to the S3 bucket. This is another step towards reducing cost.

Create IAM role for lambda and attached required policies:

> aws iam create-role --role-name lambda-role --assume-role-policy-document file://lambda/lambda_policy.json> aws iam attach-role-policy --role-name lambda-role  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole> aws iam attach-role-policy --role-name lambda-role  --policy-arn arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess> aws iam attach-role-policy --role-name lambda-role  --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

Create the lambda function

> ROLE_ARN=`aws iam get-role --role-name lambda-role | grep Arn | sed -E 's/\s+"Arn"://' | sed 's/,//' | sed 's/"//g' | sed -E 's/\s++//'`> zip lambda_code.zip lambda/lambda_function.py> aws lambda create-function --function-name lambda-determine-storage --runtime python3.8 --role $ROLE_ARN --handler lambda_function.lambda_handler --timeout 60 --zip-file fileb://lambda_code.zip | grep FunctionArn | sed -e 's/"//g' -e 's/,//g'  -e 's/FunctionArn//g' -e 's/: //g'

3. EMR Cluster

This is an explanation about the choices made for the EMR cluster. The EMR cluster was already defined in the first step and will be created when the StepFunction is triggered.

As mentioned in the above documentation:

The node types in Amazon EMR are as follows:

Master node: A node that manages the cluster by running software components to coordinate the distribution of data and tasks among other nodes for processing. The master node tracks the status of tasks and monitors the health of the cluster. Every cluster has a master node, and it’s possible to create a single-node cluster with only the master node.

Core node: A node with software components that run tasks and store data in the Hadoop Distributed File System (HDFS) on your cluster. Multi-node clusters have at least one core node.

Task node: A node with software components that only runs tasks and does not store data in HDFS. Task nodes are optional.

For the purpose of this article, let's assume that the data processing job requires 6 r5.12xlarge CORE nodes and one m5.xlarge MASTER node (can be tweaked according to the use case). Another consideration for using r5.12xlarge instances is that at the time of writing this article, the of these instances is less than 5%, hence we can use spot instances to reduce cost by ~70% with good confidence (although, it is not advised for sustained production load).

In case you wish to use on-demand instances instead of spot instances, please change the second Instance Fleet (that mentions config for 'CORE' InstanceFleet) in stepfunction/stepfunction_definition.json

{		
"Name": "CORE",
"TargetOnDemandCapacity": 6,
"InstanceFleetType": "CORE",
"InstanceTypeConfigs": [
{
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB.$": "$.lambda_op.ebs_size",
"VolumeType": "gp2"
},
"VolumesPerInstance": 1
}
],
"EbsOptimized": true
},
"InstanceType": "r5.12xlarge"
}
]
}

One important step for any EMR job is the bootstrapping step. This step is used to add any additional drivers, software, or any other instance customization needed. To demonstrate this, I will clone the codebase using this step. The code for it can be found .

4. MapReduce step and Cloudwatch Alarms

The MapReduce is a simple Python code that creates a spark app and then kills itself. This should be enough to test if the step was triggered correctly and if the spark was instantiated correctly.

To use PySpark, it will have to installed in all the EC2 instances in the cluster. This can be achieved in two ways, either by adding a pip install command in the bootstrap step in the EMR cluster definition or an Amazon Machine Image (AMI) can be created with PySpark already installed.

Since there is no direct method to run Python code in the EMR cluster, we run it using a bash script. Bash scripts can be run on EMR clusters using a JAR file provided by AWS (s3://regionname.elasticmapreduce/libs/script-runner/script-runner.jar ).

The Python code for the MapReduce step is can be found and the bash wrapper for it is can be found .

The AWS CloudWatch alarms are created to check if the EMR cluster is idle or if any apps have failed. Hence, the MapReduce step is wrapped by the creation and deletion of these alarms. This is also achieved using the above-mentioned method of using a bash script.

Create the EventBridge Rule (triggered every day at 6 am)

aws events put-rule --name pipeline-trigger \
--schedule-expression "cron(0 6 * * *)" \
--state ENABLED

Add StepFunction as the trigger for the EventBridge rule

aws events put-targets --rule pipeline-trigger \
--targets "Id"="1","Arn"=$STEPFUNCTION_ARN,"RoleArn"=$SF_ROLE_ARN

Future Work

These are some improvements that can be made in the future:

  • Add a lambda function that can send updates to slack/any other service to get updates about the steps being executed.
  • Add a database to maintain process information for future debugging, and so that the raw data doesn’t have to be deleted once processed. Instead, a difference based on process information can be used.

Nerd For Tech

From Confusion to Clarification

Nerd For Tech

NFT is an Educational Media House. Our mission is to bring the invaluable knowledge and experiences of experts from all over the world to the novice. To know more about us, visit https://www.nerdfortech.org/. Don’t forget to check out Ask-NFT, a mentorship ecosystem we’ve started

Jinam Shah

Written by

Sr. Software Engineer at Cactus Communications who is passionate about AWS and Data science. | jinamshah.com

Nerd For Tech

NFT is an Educational Media House. Our mission is to bring the invaluable knowledge and experiences of experts from all over the world to the novice. To know more about us, visit https://www.nerdfortech.org/. Don’t forget to check out Ask-NFT, a mentorship ecosystem we’ve started

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface.

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox.

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store