Getting Started with AWS EMR Like a Boss

Aditya Goyal
Circles.Life
Published in
6 min readJul 3, 2020

Introduction

At Circles.Life, data is a first-class citizen, which means that data-driven decisions are business as usual and to drive those decisions, you need to not only make large volumes of data available, and process it but also analyze it fast.

Spark is an instant choice when it comes to big data processing whether it’s batch processing, real-time processing, or ML. The challenge is to provision and manage a cluster that fulfills all these needs. Amazon EMR is an easier way to do both of those tasks and this document discusses an implementation in detail.

Problem Statement

Before provisioning an EMR cluster, the user requirements need to be mapped to the kind of workload that needs to run on the cluster, which can be basically translated into:

  1. Data Ingestion Pipelines — Batch schedules, Real-time
  2. Data Analysis — Batch schedules, Ad-hoc

Approach

The way we approached this problem is to provision a cluster which has the following characteristics:

  1. Separate YARN capacity scheduler queues for production jobs and ad-hoc jobs with further subdivision to reserve resources for Batch and Real-time production jobs
  2. Zeppelin notebooks for ad-hoc analysis with user accounts and user impersonation (to be able to identify who is using/reserving cluster resources)

Configuration on EMR

There are two ways configuration can be set on EMR cluster:

  1. Setting JSON configuration during cluster initialization (recommended)
  2. The best and easiest way to set up configuration is to supply a JSON containing all the required config during cluster setup under Edit software settings sections as shown below:
  1. The available configuration depends on the cluster version and can be found under Configuration Classifications section in a specific version given at https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html
  2. Changing individual config files and restarting the relevant services (not recommended)

In this document, we will only go through the first approach.

YARN Resource Manager

To make sure all kind of jobs has sufficient resources, we will create 3 elastic queues:

  1. Production: For all the spark-submit or real-time jobs with high SLA
  2. Adhoc: For all ad-hoc analysis submitted through shell or zeppelin with medium SLA
  3. Default: For any other job without a queue specified and are not under any SLA

Below is the configuration to achieve this:

[{
"_comment": "for yarn to use capacity scheduler",
"classification": "yarn-site",
"properties": {
"yarn.resourcemanager.scheduler.class": "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler",
"yarn.acl.enable": "true"
}
},
{
"_comment": "setup yarn queues",
"classification": "capacity-scheduler",
"properties": {
"yarn.scheduler.capacity.root.queues": "default,production,adhoc",
"yarn.scheduler.capacity.maximum-am-resource-percent": "0.2",
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator",

"yarn.scheduler.capacity.root.default.capacity": "10",
"yarn.scheduler.capacity.root.default.user-limit-factor": "2",
"yarn.scheduler.capacity.root.default.maximum-capacity": "40",

"yarn.scheduler.capacity.root.production.capacity": "50",
"yarn.scheduler.capacity.root.production.user-limit-factor": "2",
"yarn.scheduler.capacity.root.production.acl_submit_applications": "*",
"yarn.scheduler.capacity.root.production.maximum-capacity": "80",
"yarn.scheduler.capacity.root.production.state": "RUNNING",

"yarn.scheduler.capacity.root.adhoc.capacity": "40",
"yarn.scheduler.capacity.root.adhoc.user-limit-factor": "2",
"yarn.scheduler.capacity.root.adhoc.acl_submit_applications": "*",
"yarn.scheduler.capacity.root.adhoc.maximum-capacity": "60",
"yarn.scheduler.capacity.root.adhoc.state": "RUNNING"
}
}
]

Zeppelin

Things to consider while setting up zeppelin:

  1. Allow user impersonation
  2. Set the default queue for all zeppelin spark jobs to be ad-hoc queue
  3. Persist notebooks on S3 instead of a master node for high availability
  4. Notebook timeout to release spark resources

User Impersonation

Cluster level configuration (during cluster initialization):

[{
"_comment": "for zeppelin to impersonate users",
"classification": "core-site",
"properties":{
"hadoop.proxyuser.zeppelin.hosts": "*",
"hadoop.proxyuser.zeppelin.groups": "*"
}
},
{
"_comment": "for zeppelin to impersonate users",
"classification":"zeppelin-env",
"properties":{

},
"configurations":[
{
"classification": "export",
"properties": {
"ZEPPELIN_IMPERSONATE_CMD": "'sudo -H -u ${ZEPPELIN_IMPERSONATE_USER} bash -c '"
}
}
]
}
]

Master node configuration (after the cluster is up and running):

User Creation:

Create shiro.ini from shiro.ini.template under /etc/zeppelin/conf and add users and roles as per your requirements. Uncomment the admin user to be able to change interpreter settings from zeppelin GUI later.

After the shiro.ini file has been updated, create the same users in hdfs by running the following commands on the master node:

su hdfs
hdfs dfs -mkdir /user/username
hdfs dfs -chown username /user/username

Restart Zeppelin: systemctl restart zeppelin

Lastly, login to zeppelin GUI at (http://master_node_ip_address:8890) using admin account and navigate to interpreter settings:

And select the following options for spark to impersonate the user(instead of zeppelin) in YARN while running spark on zeppelin:

Default Queue:

In the same interpreter settings add queue, executors, memory and cores as per your requirements with these name and value pairs:

Validate all settings by running spark context in a new zeppelin notebook as admin:

S3 Persist:

Before we set up the config, make sure awscli is set up for both hadoop and root users with the bucket configuration (using aws configure).

To persist notebooks on S3, we need to the following configuration changes (and/or uncomment) in zeppelin-site.xml under /etc/zeppelin/conf:

<property>
<name>zeppelin.notebook.s3.user</name>
<value>zeppelin</value>
<description>user name for s3 folder structure</description></property>
<property>
<name>zeppelin.notebook.s3.bucket</name>
<value>lw-data-eng-aus</value>
<description>bucket name for notebook storage</description></property>
<property>
<name>zeppelin.notebook.s3.endpoint</name><value>s3.amazonaws.com</value>
<description>endpoint for s3 bucket</description>
</property>
<property>
<name>zeppelin.notebook.s3.timeout</name>
<value>120000</value>
<description>s3 bucket endpoint request timeout in msec.</description>
</property>
<property>
<name>zeppelin.notebook.storage</name><value>org.apache.zeppelin.notebook.repo.S3NotebookRepo</value><description>notebook persistence layer implementation</description></property>

Make sure that all other properties with the name zeppelin.notebook.storage are commented otherwise they will override this one.

Notebook Timeout:

Timeouts are very useful when users exit the notebook without releasing resources on the cluster. To enable this, we need to the following configuration changes (and uncomment) in zeppelin-site.xml under /etc/zeppelin/conf:

<property>
<name>zeppelin.interpreter.lifecyclemanager.class</name
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
<description>LifecycleManager class for managing the lifecycle of interpreters, by default interpreter will be closed after timeout</description>
</property>
<property><name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
<value>60000</value>
<description>Milliseconds of the interval to checking whether interpreter is time out</description>
</property>
<property><name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name><value>600000</value>
<description>Milliseconds of the interpreter timeout threshold, by default it is 1 hour</description>
</property>

Consolidate Configuration

The final configuration to submit during cluster initialization:

[
{
"_comment": "for zeppelin to impersonate users",
"classification": "core-site",
"properties":{
"hadoop.proxyuser.zeppelin.hosts": "*",
"hadoop.proxyuser.zeppelin.groups": "*"
}
},
{
"_comment": "for zeppelin to impersonate users",
"classification":"zeppelin-env",
"properties":{

},
"configurations":[
{
"classification": "export",
"properties": {
"ZEPPELIN_IMPERSONATE_CMD": "'sudo -H -u ${ZEPPELIN_IMPERSONATE_USER} bash -c '"
}
}
]
},
{
"_comment": "for yarn to use capacity scheduler",
"classification": "yarn-site",
"properties": {
"yarn.resourcemanager.scheduler.class": "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler",
"yarn.acl.enable": "true"
}
},
{
"_comment": "setup yarn queues",
"classification": "capacity-scheduler",
"properties": {
"yarn.scheduler.capacity.root.queues": "default,production,adhoc",
"yarn.scheduler.capacity.maximum-am-resource-percent": "0.2",
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator",

"yarn.scheduler.capacity.root.default.capacity": "10",
"yarn.scheduler.capacity.root.default.user-limit-factor": "2",
"yarn.scheduler.capacity.root.default.maximum-capacity": "40",

"yarn.scheduler.capacity.root.production.capacity": "50",
"yarn.scheduler.capacity.root.production.user-limit-factor": "2",
"yarn.scheduler.capacity.root.production.acl_submit_applications": "*",
"yarn.scheduler.capacity.root.production.maximum-capacity": "80",
"yarn.scheduler.capacity.root.production.state": "RUNNING",

"yarn.scheduler.capacity.root.adhoc.capacity": "40",
"yarn.scheduler.capacity.root.adhoc.user-limit-factor": "2",
"yarn.scheduler.capacity.root.adhoc.acl_submit_applications": "*",
"yarn.scheduler.capacity.root.adhoc.maximum-capacity": "60",
"yarn.scheduler.capacity.root.adhoc.state": "RUNNING"
}
}
]

And there you have it, an EMR cluster on AWS for all your needs.

--

--