How to Orchestrate Amazon EMR Serverless Jobs using AWS Step Functions

Alex Gelman
CyberArk Engineering
7 min readJun 21, 2023

Amazon EMR Serverless is a relatively new service that simplifies the execution of Hadoop or Spark jobs without requiring the user to manually manage cluster scaling, security, or optimizations. When we’re creating a data pipeline, we may need to run several jobs one after another, or perform some other actions once the job is complete. There are a number of options to orchestrate such a workflow, with AWS Step Functions being one of the easiest for those using AWS managed services.

However, AWS Step Functions lacks a native mechanism to pause the workflow execution until the EMR Serverless job finishes. I’m going to show how you can solve this by implementing a polling mechanism using Step Functions’ built-in states that waits for a job to finish before continuing the workflow execution. I will also show how to create a reusable block that can be reused for multiple jobs and pipelines using AWS CDK.

Photo by Zach Reiner on Unsplash

What are AWS Step Functions?

AWS Step Functions is a service that allows us to orchestrate complex data pipelines and workflows. It has built-in integration with many AWS services and supports conditions, error handling, and retries. It enables us to coordinate tasks such as data transformation, analysis, and storage, providing a visual workflow to track progress.

AWS Step Functions support starting an EMR Serverless job. However, there is no built-in support for waiting for the job to complete before moving to the next step in the workflow.

If we want to wait for the job to finish before continuing to the next step in the workflow or handling job failure, we need to implement a polling mechanism to check the job status and continue the workflow only when the job finishes.

One way of implementing such a polling mechanism is by using an AWS Lambda function. However, using a Lambda function can complicate the deployment and add additional costs due to the Lambda functions invocation and execution time. Another way of implementing the polling mechanism is in the Step Function itself. Doing so using Step Function built-in states keeps the deployment simple because we’re not adding additional AWS resources. It does add some cost due to the additional state transitions in the Step Function, but the same number of state transitions is required when using a Lambda Function.

Here are the steps you can use to implement the polling mechanism with native Step Function states.

Step #1 Create a Step Function that starts an EMR Serverless job

We’ll start by creating a simple Step Function that starts an EMR Serverless Spark job created in Java using the built-in EMR Serverless StartJobRun action.

Simple Step Function starting an EMR Serverless job
Simple Step Function starting an EMR Serverless job

Executing this Step Function starts the job and completes the execution immediately without waiting for the job to complete or handling job failure.

The Step Function definition is as follows:

{
"StartAt": "RunAppEmrJob",
"States": {
"RunAppEmrJob": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:emrserverless:startJobRun",
"Parameters": {
"ClientToken.$": "States.UUID()",
"ApplicationId": "<your EMR serverless application ID>",
"ExecutionRoleArn": "<IAM role ARN used by the job>",
"JobDriver": {
"SparkSubmit": {
"EntryPoint": "<S3 path to Jar file containing the job>",
"SparkSubmitParameters": "--class <jobs main class>"
}
}
},
"End": true
}
}
}

The definition contains a single state that calls an AWS service API. In our case we’re calling the StartJobRun API for EMR Serverless.

The API calls accept parameters about the EMR Serverless application ID, the IAM role used by the job and Apache Spark parameters for starting the job.

The Spark parameters include the path to the JAR file with the job code for the entry path and the name of the main class called.

Step #2 Waiting for the job to finish

Now we’ll add steps to the Step Function that wait for the job to complete before completing the Step Function execution.

Step Function that starts an EMR Serverless job and polls that job status until its execution is finished
Step Function that starts an EMR Serverless job and polls that job status until its execution is finished
  • We add the action from EMR Serverless: GetJobRun passing the job run ID and application ID from the previous state as input. (This step gets the job details, including the job status).
  • We add a choice state.
  • If the current status is SUCCESS, the execution continues to the success state.
  • If the job status either FAILED or CANCELED, the execution continues to the fail state.
  • In any other case we continue to a wait state that waits for 60 seconds and then retry the job status check.

This creates a polling loop that waits until the job finishes or until the Step Function execution times out. The default execution timeout for a Step Function is ten minutes. If the job does not complete within this time, our Step Function execution fails. You can control this timeout with the TimeoutSeconds parameter in the Step Function definition.

The complete Step Function definition that adds the described steps is as follows:

{
"StartAt": "RunAppEmrJob",
"States": {
"RunAppEmrJob": {
"Next": "RunAppEmrJobGetJobInfo",
"Type": "Task",
"ResultPath": "$.JobInfo",
"Resource": "arn:aws:states:::aws-sdk:emrserverless:startJobRun",
"Parameters": {
"ClientToken.$": "States.UUID()",
"ApplicationId": "<your EMR serverless application ID>",
"ExecutionRoleArn": "<IAM role ARN used by the job>",
"JobDriver": {
"SparkSubmit": {
"SparkSubmitParameters": "--class <jobs main class>",
"EntryPoint": "<S3 path to Jar file containing the job>"
}
}
}
},
"RunAppEmrJobGetJobInfo": {
"Next": "RunAppEmrJobJobStatusChoice",
"Type": "Task",
"ResultPath": "$.JobStatus",
"Resource": "arn:aws:states:::aws-sdk:emrserverless:getJobRun",
"Parameters": {
"JobRunId.$": "$.JobInfo.JobRunId",
"ApplicationId.$": "$.JobInfo.ApplicationId"
}
},
"RunAppEmrJobJobStatusChoice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.JobStatus.JobRun.State",
"StringEquals": "SUCCESS",
"Next": "SfnSuccess"
},
{
"Or": [
{
"Variable": "$.JobStatus.JobRun.State",
"StringEquals": "FAILED"
},
{
"Variable": "$.JobStatus.JobRun.State",
"StringEquals": "CANCELLING"
},
{
"Variable": "$.JobStatus.JobRun.State",
"StringEquals": "CANCELLED"
}
],
"Next": "SfnFail"
}
],
"Default": "RunAppEmrJobJobFinishWait"
},
"RunAppEmrJobJobFinishWait": {
"Type": "Wait",
"Seconds": 60,
"Next": "RunAppEmrJobGetJobInfo"
},
"SfnSuccess": {
"Type": "Succeed"
},
"SfnFail": {
"Type": "Fail"
}
}
}

Now if we want to chain multiple jobs, all we need is to replace the success state with another job run.

A note on cost:

AWS charges $0.000025 per state transition beyond the free tier. The polling mechanism adds a loop of three states to the Step Function execution. Assuming the Step Function runs up to the default timeout of ten minutes with a polling action every 60 seconds, this adds 30 state transitions.

This would add up to $0.00075 to the cost of each execution.

Step #3 Automating with AWS CDK

We may have multiple EMR Serverless jobs in a single Step Function that we’d want to use this pattern for. We can save a lot of time by creating a reusable block to apply for every EMR Serverless job in our Step Functions.

Using AWS CDK we can easily create a function that would wrap a job run state with the wait pattern and allow us to reuse it for any job.

How can we do this?

When creating an AWS Step Function definition in CDK, we can use state objects and chain them together using the next() function. Each chain can be in turn chained to another state or another chain, creating the workflow definition.

Here is an example of this type of function:

chainEmrJobWaitPattern(emrApplication:CfnApplication, jobRunState:CallAwsService, onSuccessState:IChainable, onFailState:IChainable) : IChainable {
const getJobState = new CallAwsService(this, "GetJobInfo", {
service: "emrserverless",
action: "getJobRun",
resultPath: "$.JobStatus",
iamResources: [emrApplication.attrApplicationId],
parameters: {
"ApplicationId.$": "$.JobInfo.ApplicationId",
"JobRunId.$": "$.JobInfo.JobRunId"
}
});

const statusRetryWait = new Wait(this, "JobStatusRetryWait", {
time: WaitTime.duration(Duration.seconds(60))
});
const retryChain = statusRetryWait.next(getJobState);

const jobStatusChoice = new Choice(this, "JobStatusChoice")
.when(Condition.stringEquals("$.JobStatus.JobRun.State", "SUCCESS"), onSuccessState)
.when(Condition.or(
Condition.stringEquals("$.JobStatus.JobRun.State", "FAILED"),
Condition.stringEquals("$.JobStatus.JobRun.State", "CANCELLED")
), onFailState)
.otherwise(retryChain);

const jobWaitChain = jobRunState.next(getJobState).next(jobStatusChoice);
return jobWaitChain;
}

This function receives the following:

  • EMR Serverless application defined by the CfnApplication CDK object.
  • The StartJobRun EMR Serverless API call state that starts the execution of a job.
  • A chain to be executed in case of job success.
  • A chain to be executed in case of a job failure.

The function creates the get job info state, choice state and wait states as we’ve seen in the previous section. If the job succeeds, the execution continues to the success chain. Otherwise, it continues to the failure chain. The success and failure chains can be a single state or a more complex workflow, allowing this function to be reusable in different scenarios.

The Takeaway with AWS Step Functions

Imagine you want to create a data pipeline with a Step Function that starts EMR Serverless jobs, but found out that the Step Function execution doesn’t wait for the job to finish and some polling mechanism is required. Now you’ve seen how you can implement the polling mechanism that waits for the job to finish before continuing the workflow using native Step Function states without requiring additional Lambda functions.
Using CDK you can create a reusable pattern that you can add to any EMR Serverless job you have. You can see the full example on GitHub or as a CDK construct library in JavaScript, Python and .Net.

--

--

Alex Gelman
CyberArk Engineering

Principal software architect @CyberArk and full stack tech geek