MWAA and EMR

Luigi Kapaj (PuppyKhan)
Avenue 8

--

While trying to build out a more robust data pipeline at Avenue 8, we wanted to use Scala in a Spark cluster for certain tasks but also use Airflow as the ETL management tool. Turns out they can be used together in AWS, but the documentation is neither clear nor complete on how to do it. So after piecing together information from several sources, along with some trial and error, we managed to build out the pipeline which fits our needs.

Managed Workflows for Apache Airflow (MWAA) on AWS can be used in conjunction with Spark via spinning up an Elastic MapReduce (EMR) cluster. This allows use of MWAA as the management tool for the entire ETL pipeline including running Scala jobs with EMR.

There is an example of how to set this up in the AWS documentation, but it leaves out a few important details which can be pieced together largely from this blog post which is cluttered with way too many irrelevant details, but has a similar example. This post is an attempt to isolate just the important details.

Assumptions

To keep this concise, the starting point is to have MWAA already setup and running, but no EMR cluster. The focus is on using Airflow to manage starting a spot EMR cluster, and submitting the job for it to run.

Permissions

The role for MWAA needs a permissions policy allowing it to create an EMR cluster and access it.

The three EMR actions need to be applied to all clusters since new ones will be spun up. AWS has a lot of documentation on EMR policies including more than what is needed here.

This policy needs to be applied to the role used for MWAA.

Configuration

Airflow has an operator included in MWAA which is used to create the EMR cluster, called EmrCreateJobFlowOperator. The operator takes a config structure passed to the parameter job_flow_overrides. Under the hood, this config gets passed on to Boto3 and that is where the documentation can be found for what keys to use for this config as well as its structure.

It is helpful to go to AWS’ EMR page to see the options for setting up a cluster, especially for learning the latest release version available.

There are several important settings here.

Name — This is the name for the cluster as it will appear in the Clusters list on the EMR page.

LogUri — Since the cluster is transient, use this to specify a location in S3 to store all logs. It will get a folder for each created cluster ID. See “steps” then the generated step ID folder for the actual task output.

ReleaseLabel — This takes the actual version of EMR for the cluster. See “Create cluster” on the EMR page for available releases. Each release comes with specific versions of Spark, and other software, as outlined at the About Amazon EMR Releases page. For example, EMR release emr-6.3.0 includes Spark version 3.1.1, which in turn runs Scala version 2.12, Java 8/11, Python 3.6+ and R 3.5+ according to Spark’s Overview page.

Applications — There are several applications available with EMR. For this purpose, the only one needed is Spark.

Instances.InstanceGroups — This is where the cluster gets defined:

  • InstanceRole — Both a MASTER and a CORE are used. MASTER is needed and can be used alone for a simple standalone task, and CORE is for multiple or parallel tasks.
  • Market — Changing this from ON_DEMAND to SPOT introduces cost savings, but with all caveats of using spot instances.
  • InstanceType — The default is m5.xlarge as of this writing but adjust as needed.
  • InstanceCount — Increase this in the CORE group to run more parallel tasks at once.

Instances.Ec2KeyName — Set to appropriate key for environment.

Instances.Ec2SubnetId — Specify the correct Subnet ID of the organization for the environment desired.

Instances.Placement — Do not use this if Ec2SubnetId is used which is inclusive of this info, else pass it an AvailabilityZone.

Remaining values should stick with defaults shown.

Code

There are three steps for the Airflow DAG managing an EMR job.

First, create the EMR cluster using airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator, passing it the above config.

Second, define steps using airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator which takes a config structure passed to the parameter steps. The EMR documentation outlines using the command runner with further detail in the spark submit documentation, passed in the following format:

Args — takes a list of parameters sent to command-runner. The first parameter is spark-submit and the following are sent as parameters to spark-submit. Keep each parameter as a separate string. Replace the class and Jar names with those used for the application.

Finally, watch for completion using airflow.contrib.sensors.emr_step_sensor.EmrStepSensor.

Imports and Parameters

The level of indirection in creating the EMR cluster from MWAA and running the job by passing the actual job to EMR to command-runner to spark-submit, makes adding custom Jars need a few extra steps. Basically, add a bootup script to the cluster which copies the Jar from S3, and then add that location to the spark-submit parameters in the Args list.

The Spark job can read files from S3, but spark-submit can only read from the local filesystem, so the initialization has to copy any additional files needed for starting the job to the local instance before the job is submitted.

First create a simple script for copying from S3 to a local mount point, and place it in S3. This script gets run by the cluster instances, but needs to place the file in a folder mounted by Spark to be accessible by the job.

Then add BootstrapActions to the job_flow_overrides config telling it where to find the script, and to run it.

Updated with BootstrapActions section

Path — This gets the location and name of the Bash script for copying the import files.

Args — Can be used to pass arguments to the Bash script, such as using version numbers on the Jar files.

Finally, add the Jar files, along with mount point location, to the Step config by including a --jars parameter to spark-submit.

Updated with imports and parameters

Here is also where any parameters can be added for the Airflow DAG to send to the Spark job. Place them as additional args following the Scala application’s Jar file name.

Running the Job

With the AWS permissions and Jar and script files in place, start or schedule the DAG in Airflow and the rest gets handled automatically. The EMR cluster will get provisioned, started, sent the job, report completion status, and stopped all at the trigger from Airflow. The EMR page can be viewed to see a history of the clusters along with the IDs for looking up log subfolders.

--

--

Luigi Kapaj (PuppyKhan)
Avenue 8

A bit of data engineering, computer tech, education, politics, science, martial arts, Mongol culture, and whatever else strikes my fancy