AWS Lambda — How to Automate Your Spark Jobs on AWS Elastic Map Reduce (EMR)

Image result for aws lambda

It is no surprise that big data architectures are leveraging AWS lambda more and more each day. A serverless service, AWS Lambda increases operational efficacy while decreasing complexity within your big data architecture. One of Lambda’s biggest strengths is its ability to integrate with a sundry of other AWS services (some of which include AWS Cloudwatch, AWS S3, and AWS Kinesis).

Image Source: https://stackoverflow.com/questions/39899839/alexa-skills-kit-trigger-not-available-on-drop-down-in-aws-lambda

I am a huge proponent of the event-driven pattern and Lambda allows developers to seamlessly orchestrate real-time data pipelines without needing static infrastructure. Lambda is proving to be the centerpiece for dynamic data-processing architectures:

  • Lambda can be simultaneously invoked by many different events, such as a newly created objects in AWS S3 (S3 publish event).
  • Lambda can be fronted by an API Gateway endpoint.
  • Lambda dynamically scales in response to increased traffic.

In this post, I will discuss the challenge of using Lambda to kick off an Elastic Map Reduce (which I will refer to as EMR) Cluster and running a spark job. While working on a project at Capital One, one of the challenges I faced was getting Lambda, with the AWS Java SDK, to construct an ephemeral EMR cluster and submit a spark job. This project was done in Scala, which uses the same AWS Java SDK as any other java project. The focal point of this challenge was the SKD’s lack of abstraction and support for EMR.

When spinning up any compute instance or cluster of instances, many configurations parameters are required; instance size, instance type, autoscaling configurations, bootstrapping actions — the list goes on. EMR is no different. So, the first item created in this project was a configuration file for the EMR cluster(emrConfig.json):

{
"emrRegion": "us-west-2",
"applications": [
{
"name": "Spark"
}
],
"autoScalingRole": "EMR_AutoScaling_DefaultRole",
"bootstrapActions": [
{
"name": "BootstrapAction1",
"scriptBootstrapAction": {
"path": "s3://yourBucket/path/to/bootstrapAction1.sh"
}
},
{
"name": "Datadog Bootstrap",
"scriptBootstrapAction": {
"path": "s3://yourBucket/path/to/bootStrapAction2.sh",
}
}
],
.
. See GitHub for complete configuration file.
.

"securityConfiguration": "Your-security-config",
"serviceRole": "EMR_DefaultRole",
"tags": [
{
"key": "Key1",
"value": "Value1"
},
{
"key": "Key2",
"value": "Value2"
},
{
"key": "OwnerContact",
"value": "yourEmail@example.com"
}
],
"visibleToAllUsers": true
}

Complete EMR configuration file: https://github.com/smandrell/lambda-emr-example/blob/master/emrConfig.json

Unfortunately, I realized that the AWS Java SDK did not offer any native support for passing configuration files to create an EMR client. This was the challenge. Based on deadlines and priorities to get something working, my first iteration consisted of a platter of boiler-plate code that screamed “Refactor!”:

new RunJobFlowRequest()
.withAdditionalInfo("{"proxyHost": "your.proxyHost.com"}")
.withApplications("Spark")
.withAutoScalingRole("EMR_AutoScaling_DefaultRole")
.withBootstrapActions(bootstrapActions)
.withCustomAmiId("ami-12345")
.withInstances(instanceConfig) // 'instanceConfig' is a var
.withJobFlowRole("Role-For-Spark-Job")
.withLogUri("s3://yourBucket/path/to/emrLogs")
.withName("name-of-spark-job")
.withReleaseLabel("emr-5.19.0")
.withRepoUpgradeOnBoot("SECURITY")
.withSecurityConfiguration("Your-Security-Config")
.withServiceRole("EMR_DefaultRole")
.withSteps(sparkSteps)
.withTags(tags)
.withVisibleToAllUsers(true)

Complete iteration 1 here: https://github.com/smandrell/lambda-emr-example/tree/master/iteration1

This first iteration is troubling for obvious reasons: EMR configuration values are hard-coded in methods, with the same values existing in the configuration file itself. Any change to the configuration file needs to be propagated to the code (and vice versa), which makes implementing changes rather cumbersome.

There needed to be a way to abstract away hard-coded values in code and allow the EMR client to read in the configuration values directly from the configuration file. The solution was to create a series of nested case classes which define the overall structure of the configuration file:

case class RunJobFlowConfig(emrRegion: String,
sparkConfig: StepsConfig,
additionalInfo: String,
applications: List[ApplicationConfig],
autoScalingRole: String,
bootstrapActions: List[BootstrapConfig],
customAmiId: String,
instances: JobInstancesConfig,
jobFlowRole: String,
logUri: String,
name: String,
releaseLabel: String,
repoUpgradeOnBoot: String,
securityConfiguration: String,
serviceRole: String,
tags: List[TagConfig],
visibleToAllUsers: Boolean)

See all the case classes here: https://github.com/smandrell/lambda-emr-example/tree/master/iteration2/model

There are two noteworthy things aboutRunJobFlowConfig:

  • The parameters of this case class match the top-level parameters of our emrConfig.json file.
  • Some parameters in this case class are other case classes. Nested JSON files (like emrConfig.json), are synonymous with nested case classes; the structures of both entities need to align.

Next, with these case classes, you can use a JSON parser like Jackson to read in your configuration file and map the values in the file to the case classes:

// Create object mapper for mapping JSON -> case classes
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)

// Read in file and Construct EMR case classes from EMR config file
val emrConfig = Source.fromFile("./emrConfig.json)
mapper.readValue[RunJobFlowConfig](json.reader())

Now, our emrConfig variable is essentially a case class representing all the values of our EMR configuration (defined in emrConfig.json)! For example, to get the emrRegion value, we can access it via emrConfig.emrRegion; to get the name of the first bootstrap action, we can simply use emrConfig.bootstrapActions.head.name.

Further, we can remove all the hard-coded string values and refer to our case classes when we need to get any EMR configuration value. Now we can instantiate our EMR client as follows:

// Create EMR client
val emrClient = createEmrClient(emrConfig.emrRegion, new DefaultAWSCredentialsProviderChain)

We’re not done yet, though. Our EMR client methods need to take in actual AWS EMR objects defined by the SDK; we can’t simply pass a case class to our EMR client and expect it to magically know what it means. So, the last step is to create variables (or methods if there needs to be parameters involved) for each case class that defines the relevant EMR object configured by the parameters specified in our case class:

import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest
case class RunJobFlowConfig(emrRegion: String,
sparkConfig: StepsConfig,
additionalInfo: String,
applications: List[ApplicationConfig],
autoScalingRole: String,
bootstrapActions: List[BootstrapConfig],
customAmiId: String,
instances: JobInstancesConfig,
jobFlowRole: String,
logUri: String,
name: String,
releaseLabel: String,
repoUpgradeOnBoot: String,
securityConfiguration: String,
serviceRole: String,
tags: List[TagConfig],
visibleToAllUsers: Boolean) {

// Define AWS RunJobFlowRequest with RunJobFlowConfig values
val runJobFlowRequest: RunJobFlowRequest =
new RunJobFlowRequest()
.withAdditionalInfo(additionalInfo)
.withApplications(applications.map(app => app.emrApplication).asJava)
.withAutoScalingRole(autoScalingRole)
.withBootstrapActions(
bootstrapActions.map(b => b.bootstrapActionConfig).asJava)
.withCustomAmiId(ami)
.withInstances(instances.jobFlowInstancesConfig)
.withJobFlowRole(jobFlowRole)
.withLogUri(logUri)
.withName(name)
.withReleaseLabel(releaseLabel)
.withRepoUpgradeOnBoot(repoUpgradeOnBoot)
.withSecurityConfiguration(securityConfiguration)
.withServiceRole(serviceRole)
.withSteps(sparkConfig.emrSteps)
.withTags(tags.map(t => t.tag).asJava)
.withVisibleToAllUsers(visibleToAllUsers)

}

Finally, we can create our RunJobFlowRequest and use our client to run it:

// Create job flow request for spark job and run the job on EMR
val runJobFlowRequest = emrConfig.runJobFlowRequest()
emrClient.runJobFlow(runJobFlowRequest)

We’ve now successfully spun up an ephemeral EMR cluster and submitted a spark job to run.

See complete project here: https://github.com/smandrell/lambda-emr-example