AWS EMR Elastic Map Reduce — a Tiny Demonstration using AWS CLI

Amazon EMR is a PaaS (Platform as a Service) that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. By using these frameworks you can process data for analytics and business purposes. EMR can also transform and move large amounts of data into and out of other AWS data stores and databases, such as Amazon Simple Storage Service (Amazon S3) and Amazon DynamoDB.

In this demo we will use input data (text documents containing English words) stored in an S3 bucket, process it using a sample Python application to count the words and finally group the results. The output will also be stored in the same S3 bucket.

What is so special about this word count program

Though process look simple, the advantage of EMR is in its applicability in bigdata or large quantity of data and the ability to process in parallel. Input data is split into smaller chunks (map) and send to the workers (in this case Python word count application) and the results are aggregated (reduce) to produce a meaning full output quickly.

Join me in this tiny command line demo.

Create Input Bucket

aws s3 --region ap-south-1 mb s3://emr-demo-sree

Please note that to keep it simple we will use the same bucket for the results as well.

Copy the Input Data and Processing Program

aws s3 sync s3://elasticmapreduce/samples/wordcount wordcount
tree wordcount
wordcount/
├── input
│ ├── 0001
│ ├── 0002
│ ├── 0003
│ ├── 0004
│ ├── 0005
│ ├── 0006
│ ├── 0007
│ ├── 0008
│ ├── 0009
│ ├── 0010
│ ├── 0011
│ └── 0012
└── wordSplitter.py
aws s3 sync wordcount s3://emr-demo-sree/
upload: wordcount/input/0011 to s3://emr-demo-sree/input/0011
upload: wordcount/input/0012 to s3://emr-demo-sree/input/0012
upload: wordcount/input/0010 to s3://emr-demo-sree/input/0010
upload: wordcount/wordSplitter.py to s3://emr-demo-sree/wordSplitter.py
upload: wordcount/input/0004 to s3://emr-demo-sree/input/0004
upload: wordcount/input/0003 to s3://emr-demo-sree/input/0003
upload: wordcount/input/0007 to s3://emr-demo-sree/input/0007
upload: wordcount/input/0008 to s3://emr-demo-sree/input/0008
upload: wordcount/input/0001 to s3://emr-demo-sree/input/0001
upload: wordcount/input/0002 to s3://emr-demo-sree/input/0002
upload: wordcount/input/0006 to s3://emr-demo-sree/input/0006
upload: wordcount/input/0009 to s3://emr-demo-sree/input/0009
upload: wordcount/input/0005 to s3://emr-demo-sree/input/0005

Create AWS EMR Roles

aws emr create-default-rolesaws iam list-roles| grep -i emr
"RoleName": "AWSServiceRoleForEMRCleanup",
"Arn": "arn:aws:iam::nnnn:role/aws-service-role/elasticmapreduce.amazonaws.com/AWSServiceRoleForEMRCleanup",
"RoleName": "EMR_DefaultRole",
"Arn": "arn:aws:iam::nnnn:role/EMR_DefaultRole",
"RoleName": "EMR_EC2_DefaultRole",
"Arn": "arn:aws:iam::nnnn:role/EMR_EC2_DefaultRole",

This builtin facility to create the roles needed by EMR is a great aid.

Create EMR Cluster

aws --region ap-south-1 emr create-cluster  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=2,InstanceType=m4.large --name "Test Cluster" --log-uri s3://emr-demo-sree/logs/ --enable-debugging --tags Name=emr \
--ec2-attributes '{"KeyName":"my-demo-key","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-nnnn","EmrManagedSlaveSecurityGroup":"sg-nnnn","EmrManagedMasterSecurityGroup":"sg-nnnn"}' \
--release-label emr-5.13.0 \
--service-role EMR_DefaultRole
{
"ClusterId": "j-LT8GPHMUIWL4"
}
cid=j-LT8GPHMUIWL4aws --region ap-south-1 emr list-clusters --active
{
"Clusters": [
{
"Id": "j-LT8GPHMUIWL4",
"Name": "Test Cluster",
"Status": {
"State": "WAITING",
"StateChangeReason": {
"Message": "Cluster ready after last step completed."
},
"Timeline": {
"CreationDateTime": 1526017522.75,
"ReadyDateTime": 1526017740.954
}
},
"NormalizedInstanceHours": 0
}
]
}

Add Processing Steps

aws --region ap-south-1 emr add-steps --cluster-id $cid \
--steps Type=STREAMING,Name='Word Count',ActionOnFailure=CONTINUE,Args=--files,s3://emr-demo-sree/wordSplitter.py,-mapper,wordSplitter.py,-reducer,aggregate,-input,s3://emr-demo-sree/input,-output,s3://emr-demo-sree/output

{
"StepIds": [
"s-2M4DI1IS7MLCM"
]
}
sid=s-2M4DI1IS7MLCM
aws --region ap-south-1 emr describe-step --cluster-id $cid --step-id $sid \
--query "Step.Status.State"
"RUNNING"aws --region ap-south-1 emr describe-step --cluster-id $cid --step-id $sid \
--query "Step.Status.State"
"COMPLETED"

Repeat the above command till you see “COMPLETED” state.

Fetch The Results and Verify

aws s3 sync s3://emr-demo-sree/output wordcount/output
download: s3://emr-demo-sree/output/_SUCCESS to wordcount/output/_SUCCESS
download: s3://emr-demo-sree/output/part-00000 to wordcount/output/part-00000
download: s3://emr-demo-sree/output/part-00002 to wordcount/output/part-00002
download: s3://emr-demo-sree/output/part-00001 to wordcount/output/part-00001
tree wordcount
wordcount/
├── input
│ ├── 0001
│ ├── 0002
│ ├── 0003
│ ├── 0004
│ ├── 0005
│ ├── 0006
│ ├── 0007
│ ├── 0008
│ ├── 0009
│ ├── 0010
│ ├── 0011
│ └── 0012
├── output
│ ├── _SUCCESS
│ ├── part-00000
│ ├── part-00001
│ └── part-00002
└── wordSplitter.py
head wordcount/output/part-00000
a 14716
aa 52
aakar 3
aargau 3
abad 3
abandoned 46
abandonment 6
abate 9
abauj 3
abbassid 4

The output folder from the S3 bucket has the aggregated word counts!

Cleanup

aws --region ap-south-1 emr terminate-clusters --cluster-id $cidaws --region ap-south-1 emr list-clusters --active
{
"Clusters": [
{
"Id": "j-LT8GPHMUIWL4",
"Name": "Test Cluster",
"Status": {
"State": "TERMINATING",
"StateChangeReason": {
"Code": "USER_REQUEST",
"Message": "Terminated by user request"
},
"Timeline": {
"CreationDateTime": 1525967726.752,
"ReadyDateTime": 1525967961.772
}
},
"NormalizedInstanceHours": 12
}
]
}
aws s3 --region ap-south-1 rb s3://emr-demo-sree --force

The Python Program Used in the Above Demo provided by AWS is a follows

#!/usr/bin/python
import sys
import re
def main(argv):
pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
for line in sys.stdin:
for word in pattern.findall(line):
print "LongValueSum:" + word.lower() + "\t" + "1"
if __name__ == "__main__":
main(sys.argv)

Though this example is from AWS itself, hope this tiny demo can help in clearly understanding what is EMR and one specific use case. Please follow me for such tiny demos! Thank you for you time.

Sreeprakash Neelakantan

Written by

AWS Certified DevOps Engineer & Solutions Architect Professional — Docker | Kubernetes | DevOps — Trainer | Running | Swimming | Cycling