Optimize Spark jobs on EMR Cluster

Sharad Varshney
5 min readFeb 1, 2019

--

This article is for Data Scientists and ML Engineers who are running Spark jobs using EMR cluster on AWS and have faced challenges debugging job failures. I will assume you already have EMR cluster up and running with EMR-5.20.0 version and above and Spark-2.3.0.

In this article, I will share my experience and challenges that I faced running Spark jobs in EMR cluster. EMR has a unique feature of being able to add as many Core and Task nodes to the cluster as needed to finish the job which sets it apart from other big data cluster, but to utilize all the resources that we can add to the cluster we need more in depth knowledge all the variables in play and how to fine tune them.

AWS EMR using Spark
  1. PySpark vs Spark in Scala — First point to think about whether to create your driver program in Python or Scala. I started with Python as thats the most common language to Data Scientists these days and have had my share of challenges. First to note Python library is just a wrapper on top of Scala code, not that it would matter much but debugging is much bigger problem when you are running Spark jobs using PySpark. We tried to debug issues for 4 weeks with Python and had our fair share of challenges after which we decided to write the Spark code in Scala. It was bit of learning curve but it paid off to run your Spark code effectively on 10–500 node cluster. If you are looking to get help with compiling Scala on emr, go through this article.
  2. Dependencies with spark-submit — PySpark had bigger problems when needed to send dependencies with spark-submit. Numpy is the most common python library which is being used by many of us and also by pandas internally. Packaging numpy as core dependency and sending it with spark-submit caused us lots of trouble as most of the numpy code is in C and different machines can have different version of c compilers, so we created our custom AMI with pre-install libraries, but this is not a scalable solution as anytime we needed a new library, we needed to create a new AMI and spun a new cluster. In Scala sending dependencies jar felt like piece of cake, you just define — jars switch and send your jar containing dependencies. In Python you need to wheel your files first and then deal with other issues.
  3. Spark has inbuilt functionality to allocate most of the resources available in the cluster, which is a good feature for smaller node clusters between 2–4 nodes but anything above those numbers you need to switch off dynamicAllocation : false. Every spark config in command line shell starts with -- conf.

— — conf spark.dynamicAllocation.enabled=false

This property when set to false, lets you define your own number of executors, cores and memory allocation for those executors.

“spark.executor.instances” — Number of executor instances

“executor.cores” — Number of cores per executor instance

“executor.memory” — Total memory allocated per executor

It is also good to define driver memory for spark driver. A Spark driver also known as Application driver process or application master driver, is a JVM process that hosts sparkContext for your particular application. It is also a master node and splits Spark application into Tasks and schedules them to run on executors and also coordinates workers and overall execution of tasks.

  1. MemoryOverhead — is most misunderstood config variable, do allocate 500m-5g as memory overhead per executor depending on how much your are allocating to your executor memory — generally 1% — 10% of your memory

“- -conf spark.executor.memoryOverhead=5g”

After you make these modifications and I can say to a great optimism that your Spark jobs will be running very fast. But there is still 1 last issue: after everything runs very fast, these jobs still slow down towards the end, if you are writing to S3, if you are writing to local HDFS — it should be fine. EMR is using “emrfs-s3-optimized OutputCommitter” which should be optimized but I did face huge slowness towards the last phase of the jobs when its writing temp directories from s3 to permanent directories.

Sample spark-submit script:

sudo spark-submit — master yarn — deploy-mode cluster — conf spark.executor.instances=14 -- executor-cores 4 -- executor-memory 15g — conf spark.yarn.submit.waitAppCompletion=false — conf spark.dynamicAllocation.enabled=false — conf spark.driver.memory=8g — conf spark.executor.memoryOverhead=3g -- jars file:///home/hadoop/code/s3committer-0.5.5.jar /home/hadoop/code/target/scala-2.11/spark_job_2.11–0.1.jar

This script submits spark job to EMR running YARN cluster in cluster mode where master driver also runs on cluster (CORE nodes), configures 14 executors which has 4 cores each, so every executor will have 4 cores. Every executor will also have 15 gb of memory allocated for its tasks. Spark driver memory is 8gb and memory overhead for each executor is 3gb — this becomes very important if your containers are being killed in the between the run or being shut down because of vmem exceeding which shows errors like:

ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 59.3 GB of 59 GB physical memory used.

Conclusion

Very easy ? Well took us more than 8 weeks to get upto this point thats the main reason of posting this article to help anyone out there and save them those hours. This is the first article I wrote on Medium, any suggestions and feedbacks are welcome . Next optimizations we did is to add Yaml support to our scala code base so most of those hard coded variables can be part of config files and not needed to be compile on every run and next work we are heading towards is to script out all the jobs — so we can spin up cluster to only run the job and it can be killed on successful completion and/or some major failures by pushing all logs to s3 for root cause analysis. At the end I want to thank Prakash Jagatheesan as he worked very closely with me on these Spark jobs and equally participated in fine tuning these parameters.

--

--