Spark on Yarn

Spark need the resources to do the computation and Yarn manage all the resource (CPU cores and memory) on the cluster. Understand how Spark communicate with Yarn to finish all the tasks could help us better analyse the different Spark problem. In the end of this blog, I will try to address some common exceptions related to Yarn when running Spark application.

We know Hadoop Yarn has 3 components: Resource Manager (RM), Application Master (AM) and Node Manager (NM). From the name, or some Google help, it’s ok to understand their responsibilities, but if you want to build an application running on Yarn like Spark (application), how to interact with all these three components? It’s quiet interesting to see how it works on taking Spark as an example.

Spark on Yarn support 2 deploy mode, “cluster” and “client”, the only different is your SparkDriver running inside or outside of a Yarn Container. We will focus on cluster mode, which is normally used on the production system. The code source is base on Spark 2.0.

Starting

When start a Spark application by spark-submit command, in SparkSubmit (1k line size static class), on cluster mode, the class org.apache.spark.deploy.yarn.Client will be executed. The first thing is to submit a Yarn application to Yarn Resource Manager (RM). On more detail, this include creating, starting a YarnClient object from Apache Yarn library, and calling its createApplication() method. With first received GetNewApplicationResponse instance from Yarn, we check whether the max memory resource capability is enough for launch the Spark Driver and Executors. Then it create the ContainerLaunchContext (all the JVM options are here, also specific the AM executable class name org.apache.spark.deploy.yarn.ApplicationMaster), and the ApplicationSubmissionContext (app’s tags, max attempts, capability required), call submitApplication() of YarnClient to finally submit the new application. In the end, waiting for RM to launch the AM on the first allocated container.

Suppose Yarn RM is happy to start a container running our Spark ApplicationMaster. In the run() method of AM, it will start a single Thread (called DriverThread) to execute the user class specific in — class JVM parameter (this is the Main class of your Spark application). Spark application Driver will be initialised in this Thread. A lot of checks and initialisations will happen when the SparkContext has be created. The main different with Yarn is in createTaskScheduler() of SparContext class, it will return one instances of YarnScheduler and one YarnSchedulerBackend. Here the YarnScheduler is very similar to the TaskSchedulerImpl, but the YarnSchedulerBackend is the bridge between the Yarn containers and Spark tasks.

Continue inside of AM, AM register itself to YarnRMClient, and get back a instance of YarnAllocator. Spark YarnRMClient is just a wrapper for the the Yarn’s AMRMClient to handle the registering and unregistering the AM with Yarn RM. Once AM got this YarnAllocator, first thing is call its allocateResources() method, this will use Yarn’s AMRMClient to allocate all the containers(executors) for the first time (all the target Num Executors are come from the SparkConf). Next major thing is to launch a ReporterThread which keeping call allocateResources() and do some sleep. For each container allocated successfully, in the runAllocatedContainers() method of Allocator, it will launch one instance of ExecutorRunnable to start the Executor in this container. ExecutorRunnable prepare the container’s command(run java command for CoarseGrainedExecutorBackend class), set up the container’s environment, and call startContainer() of Yarn NMClient to finally start an Executor in the container.

Handling the resources

We have saw the process about how Spark use RM, AM and NM to start the Driver and Executors in Yarn containers. But how Spark handle these allocated containers, and how to send the tasks to execute on them?

The main containers’s handling work is done by YarnAllocator. Allocator has the status of all the running and failed Executors, its allocateResources() method will be called by AM periodically (200ms by default, sometime need sleep more longer if it has too many container pending requests). YarnAllocator also deal with this PreferredLocalities (the preference of host for launch an Executor), handle the completed containers, keeping update the resource request for RM … Manage all the running containers for a Spark application is a busy job, you can see most of Yarn exception come from this class.

Communications

From the very high level, in Driver side, CoarseGrainedSchedulerBackend(super class of YarnSchedulerBackend) is the centre of communications. It call TaskScheduler to get the tasks to run, and launch a DriverEndpoint to listening RegisterExecutor message from CoarseGrainedExecutorBackend(the main class to start Executor in the Container). Once it know some Executors ready, it send tasks (LaunchTask message) by calling registered ExecutorEndpoint initialised in CoarseGrainedExecutorBackend.

Here we can see the separation is the Executors in Yarn Container communicate directly with Driver side endpoint for execute the tasks, but if Driver need more Executors, it need ask Spark AM, and AM delegate to Allocator to require the resource from Yarn RM. Ideally YarnAllocator know how many Executors and resource that the application need from SparkConf in the beginning, but as Spark has this dynamic allocation feature, that mean Driver side will continuously send RequestExecutors or KillExecutors messages to Spark AM.

The YarnSchedulerBackend not only launch the DriverEndpoint (renamed to YarnDriverEndpoint), also it launch a YarnSchedulerEndpoint to handle the RequestExecutors, KillExecutors, GetExecutorLossReason messages from ExecutorAllocationClient. (the client class to execute the commands from ExecutorAllocationManager). This YarnSchedulerEndpoint send the received requests to Spark AM by calling the AMEndpoint (RPC endpoint running inside of AM). To do this, YarnSchedulerBackend need received the first RegisterClusterManager message from an AM started. Once the message arrive inside of AM, AM will call YarnAllocator to do the correspond works.

Exceptions

Let’s look at some common Yarn exception for Spark application:

  • Lost executor 178 on ip-10–133–21–64.ec2.internal: Container marked as failed: container_1473344855553_0002_01_004878 on host: ip-10–133–21–64.ec2.internal. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 Killed by external signal

This exception is come from YarnAllocator, and it’s very annoying, because Yarn don’t give Spark any information about why container exit exceptionally. Most of case it cause by the memory issue. The JVM running in side of Container has just been killed. You have to dig into the Yarn logs with container Id to found out the full exception stack trace. And when this happen, you have at least one task failed, your Spark job will be aborted.

  • Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

The exception is throw by YarnAllocator when it trying to handle the completed containers. The container can exist because the JVM running over use the physical memory. By default, Spark Yarn application master will require the container with memory size equal to executor memory + overhead memory (YarnAllocator’s createContainerRequest() function) and launch the ExecutorRunnable in the container by giving the spark.executor.memory as the -XMX parameter. On the Spark documentation, the overhead memory is for off-heap memory, and there should be 6–10% of executor memory, but if you enable Tungsten memory management, your Spark application will consume a lot of off-heap memory. You can always add the customised JVM parameters for launch the Executor. About the memory setup of Yarn, yarn.nodemanager.resource.memory-mb is an important configuration. Correspondently Yarn has yarn.nodemanager.resource.cpu-vcores as well.

  • org.apache.spark.SparkException: Could not find CoarseGrainedScheduler or it has been stopped.

     at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$org$apache$spark$scheduler$cluster$YarnSchedulerBackend$$handleExecutorDisconnectedFromDriver$1.applyOrElse(YarnSchedulerBackend.scala:148)

When DriverEndpoint received a disconnected Executor’s RPC address, for YarnSchedulerBackend, it has to ask AM for which reason this executor has failed, so it can inform the TaskSetManager that tasks on that lost executor should or not count towards a job failure. The exception happened when YarnSchedulerEndpoint get reason from AM, and call DriverEndpoint to remove the executor with that reason, but DriveEndpoint (named “CoarseGrainedScheduler”) is just gone. In this case, normally the taskSchedule is stop, try to find out the Job aborted exception to understand the really failure reason.

  • java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:623)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)

If you have saw a InterruptedException come from DAGScheduler.runjob(), you may think about this is because Yarn has some issue. As the description above, in the Spark AM, we have two threads “Driver” and “Reporter”. “Driver” run the Spark context, “Reporter” is to keep the resources required from Driver synchronised with Yarn. If for any reason “Reporter” catch a exception from Yarn during the allocation of resources, “Reporter” will interrupt the “Driver” thread, that is what happened here, the DAGScheduler is waiting for a job finished inside of the “Driver” thread, but it has just been interrupted. You can find the Info level log like this:

Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 1 time(s) from Reporter thread.)

or Debug level log:

shutting down user thread

Unfortunately, Spark only log this as Info level, and has not print out the full stack trace of Yarn exception. But we can find something in Yarn log, or watching the Yarn monitoring to find out what make Yarn not happy. In my case, because the Spark has send too many container request to Yarn.

Summary

Apache Yarn is a good resource management system provided in Hadoop framework. It could help you to write JVM based distributing software with interacting with RM, AM, NM. As we can see from the Spark example, there are still a lot of works to do in your software if you want it running on Yarn, specially if you plan to allocate the resource in dynamic. The handling of communications between the AM and all the containers are important, Spark has a clearly design of decoupling for all the classes handling these communications. Understand how Spark working on Yarn and all these communications can really help for the diagnostic of your Spark applications.