Remote spark-submit to YARN running on EMR

Azhaguselvan SP
5 min readFeb 10, 2018

--

I was setting up Airflow as a replacement for Oozie + (Hue) which we were using to schedule and run batch processing jobs in my workplace. Hue on top of Oozie has a lot of good functionalities, but it is a UI/UX nightmare. Also this setup was running on our EMR master, and it’s managed by AWS, which limited the customisations we wanted to do on Hue. So, enter Airflow. There are enough posts all over the internet convincing you why Airflow is better than *every* other workflow scheduler ever written, so I’m not gonna do that here ;-)

When you have a managed AWS EMR cluster and want to use Airflow to run spark jobs on it, there are two options:

EMRStepsOperator and EMRStepSensor

EMR steps interface is a ugly patchwork of a functionality, as always in case of AWS, through which you can submit “work/job” to the EMR cluster. If you take a look at the API or the Web UI for the Steps interface you’ll understand why. There is no way to stream logs from the job submitted via the API/CLI. There is no way to get the YARN application Id of the submitted job from the API/CLI or from the UI.

Airflow has a operator+sensor combo to interact with the Steps interface. This is the easier option of the two. There is no setup necessary at all in the airflow server side. You store all the config and jars in the S3/HDFS and call EMRStepsOperator with them, and use EMRStepsSensor to monitor the status of the Step. The downside is, this is all the EMRSteps* can do. You can’t display logs from the EMR in the airflow UI. You can’t retrieve the YARN application ID from the EMR. So when the job fails, or takes a while longer, there is no way to navigate to the job logs from the Airflow UI. You have to goto YARN Resource Manager UI, find the specified job(probably searching by name?) and navigate to the logs.

SparkSubmitOperator

This operator expects you have a spark-submit binary and YARN client config setup on our Airflow server. It invokes the spark-submit command with given options, blocks until the job finishes and return the final status. The good thing is, it also streams the logs from the spark-submit command stdout and stderr. We get to see the YARN application URL in the logs which we can use to navigate to the Spark/YARN UI, also when the job fails the error message and stack trace is printed in the logs itself. Awesome!

So, it made more sense to proceed with the SparkSubmitOperator for our use case. Now, setting up a remote client for submitting jobs for spark running on YARN is not straight forward as it is. But, there is a really amazing guide here : http://theckang.com/2015/remote-spark-jobs-on-yarn/ ,but this guide is for non-EMR YARN clusters. The EMR clusters bring in a bit of their own magic so the guide doesn’t work as is. The following steps are required:

These steps assume you have airflow setup on a EC2 instance(or your favourite cloud., doesn’t matter which), and it is able to talk to your EMR cluster(re: AWS security groups).

  • Download spark binary compiled with hadoop support from here : https://spark.apache.org/downloads.html Make sure you pick the right spark and hadoop versions you have on the EMR. Extract them to a convenient location. This will be your $SPARK_HOME
  • Make sure you have the Java installed on the airflow server which is compatible with the version running on EMR.
  • Our EMR cluster had the EMRFS(Fancy, consistent way of using S3 as your file system) enabled. This means we can upload our spark jobs JAR and our dependencies in S3 and use them while invoking spark-submit. This also requires that we need to have some extra dependencies that the spark-submit in the Airflow server class path. They were: emrfs-hadoop-assembly, s3-dist-cp and jersey-bundle. The first two are custom JARs from AWS. You have to locate them in your EMR master(I found them in /usr/share/aws/emr/), and copy it over to the Airflow server. The third one I guess is a dependency from the emrfs-hadoop-assembly JAR. But I’m not sure why it was not included in the assembly JAR itself. So, the jersey one you can download from the maven. The way we have done, is we store all these JARs in our S3, and when the chef sets up our Airflow server, we download them and put them into our $SPARK_HOME/jars folder.
  • Now your spark-submit binary needs to know about the YARN setup. You will have to store the configurations for YARN in a folder and supply them to spark-submit via the HADOOP_CONF_DIR or YARN_CONF_DIR env variable. They are found inside the /etc/hadoop/conf.empty/ folder in your EMR master. You’ll only need two files, core-site.xml and yarn-site.xml. Copy them over to the Airflow server and put into a folder.
  • The core-site.xml file has a configuration fs.s3.buffer.dir. This is what the s3-dist-cp uses as a temporary buffer folder before uploading to S3. This property is shared by the Spark in the EMR master and in the Airflow server. I couldn’t figure out how to configure different folders for this property in EMR master and in our Airflow server. So I had to create the same folder with appropriate permissions in the airflow server as well.
  • Now we need to pass on all this configuration to spark-submit binary by exporting them as ENV variables from inside the $SPARK_HOME/conf/spark-env.sh file as follows:

export HADOOP_CONF_DIR=<directory with core-site.xml and yarn-site.xml files>

export SPARK_HOME=<directory where spark archive was extracted>

Depends on how your Airflow server is configured, you might also need to export the AWS credentials in this file.

  • The SparkSubmitOperator is now ready to use the spark-submit binary we configured. When creating the operator, we need to specify the EMR connection ID. The connection should be configured with host set to ‘yarn’ and the following JSON in the extra field:

{ “deploy-mode”: “cluster”, “spark-home”: “$SPARK_HOME” }

Note: $SPARK_HOME should be expanded to absolute path. As an alternate you can add $SPARK_HOME/bin to the $PATH in your Airflow server.

That’s it. We should be able to deploy DAGs with SparkSubmitOperator and submit jobs to the YARN master running on EMR. I also wrote chef recipe to automate everything I have just described above(of course you have to put this into a cookbook, add some default variables and the bells and whistles):

If you liked this post, I recently wrote about why we moved away from Airflow and now use GoCD for our data pipeline automation. There is also a better version of a automation script which helps to setup remote spark-submit for EMR in that blog post. Check it out!

--

--

Azhaguselvan SP

Atheist, Software Engineer and lot of other things. Living in Berlin, Germany