Ref: Pixabay

GUIDE TO SPARK EXECUTION

Deep Dive Into the Apache Spark Driver on a Yarn Cluster

Spark Driver hosted against a Spark application is solely responsible for driving and supervising the parallel execution of the later in a cluster of computing resources. This story focuses on the key components enabling the Spark Driver to perform its duties.

Ajay Gupta
Published in
7 min readAug 5, 2020

--

While running a Spark application on a cluster, the driver container, running the application master, is the first one to be launched by the cluster resource manager. Application master, after initializing its components, launches the primary driver thread, in the same container. The driver thread runs the main’s method of the Spark application. The first thing the main method does is the initialization of the Spark context which in turn hosts the key components of the driver responsible for driving & supervising the cluster execution of the underlying Spark application. After initializing the Spark context, the driver thread starts executing the required Spark actions on the cluster using the services of the Spark context.

Here is a big picture, showing the key components, of the driver container of a Spark application running in a Yarn cluster.

Key Components in a Driver container of a Spark Application running on a Yarn Cluster

Application Master: Every Spark application is provided with an Application Master by the cluster resource manager. The application master is started in the driver container of the Spark Application by the cluster resource manager. After getting started, the Application master invokes the Spark application Main’s method in a separate driver thread inside the driver container only. Further, the Application master sets up a communication endpoint to enable communication between the driver thread and the Application Master. Also, the Application Master initiates a resource allocator which is like an agent to fulfill driver thread’s requests for computing resources (Executors) in the cluster.

In the case of Yarn, the resource allocator initiated by the Application Master is called Yarn Allocator. Yarn Allocator talks to the Yarn resource manager to allocate executor containers on request from driver thread. Also, the Yarn allocator periodically polls the Yarn resource manager for newly allocated and completed/finished containers and pass the information related to these containers to the driver thread.

Spark Context: Spark Context in the driver thread is like a container that houses the key components responsible for scheduling and supervising the execution of various actions taken in a Spark application. Spark Context is built and initialized in the Spark application Main’s method, the method being invoked in the driver thread by the Application Master.

Task Scheduler BackEnd: Task Scheduler BackEnd is like an assistant to Task Scheduler in the driver thread. It is one of the key components hosted inside the Spark Context and is started during the later’s initialization process. In the case of a Yarn Cluster, Spark Context initializes Yarn Scheduler BackEnd as the Task Scheduler BackEnd.

In general, the Task scheduler BackEnd tracks the registered executors and corresponding resources available on them. Also, the BackEnd sets up two important RPC Endpoints, one to communicate with Application Master, and another one to communicate with Executors launched in Executor containers by Yarn resource manager. Further, the scheduler BackEnd exposes various APIs to be called by the primary Task Scheduler in the driver thread. Also, it communicates to the Task scheduler via APIs exposed by the later.

Yarn Scheduler BackEnd communicates with Application master primarily to request for executors or kill allocated executors. Also, it receives notification from Application Master for removing certain executors that were allocated to the Spark Application earlier but now the cluster resource manager lost track of them, maybe due to network failures or these executors got killed due to memory overruns.

Yarn Scheduler BackEnd communicates with allocated Executor containers, via driver EndPoint, to send Launch/Kill Task messages (on behalf of Task scheduler) to corresponding Executors. Also, the BackEnd could explicitly ask Executors to stop themselves in certain scenarios. Driver EndPoint is also used by the scheduler BackEnd to receive and process registration requests from the executors, and further handle certain other notifications from the Executors.

Task Scheduler: Task scheduler is another major component in the driver thread that works along with Task Scheduler BackEnd to take care of overall task scheduling aspects during Spark execution. In the case of Spark application running on a Yarn cluster, Spark Context initializes Yarn ClusterScheduler as the Task Scheduler.

For each stage (of the Spark application) to be executed on the cluster, a set of tasks (called TaskSet) is submitted to the Task scheduler for execution.

Task Scheduler does scheduling at two levels, first at TaskSet level where Task Scheduler selects a TaskSet among multiple Tasksets submitted for execution, and second at Task level where one or more tasks are selected for execution among multiple pending Tasks within a single TaskSet.

After getting called from the Task Scheduler BackEnd for new Executors or free resources on already allocated Executors, the Task Scheduler first selects the TaskSet (having the highest scheduling priority ) for execution among various TaskSets arranged in the order of scheduling priority. Mostly, FIFO or FAIR scheduling algorithm is generally used to assign scheduling priority to various TaskSets. After selecting the TaskSet for execution, one or more pending tasks are selected for execution based on the locality constraints of each task and the overall resource availability. Once, certain tasks from the prioritized TaskSets are selected for execution, if there are still resources available for execution, then another TaskSet is picked up, based on priority order, for the selection of more schedulable tasks. This process of selection of eligible tasks for execution continues until the available resources as notified by the Task Scheduler BackEnd get exhausted.

Once, all the tasks (among one or more TaskSets) are identified for execution subject to constraints of resource availability (as indicated in the call from Task Scheduler BackEnd), the Task Scheduler BackEnd sends these tasks to corresponding Executors for execution.

Apart from scheduling the tasks for execution on Executors, Task Scheduler also handles task completion events from Executors received via Task Scheduler Backend. Further, In certain failure scenarios, Task Scheduler also plans for execution reattempts of a failed task up to a certain limit. Task Scheduler also implements an optional speculation framework that can be enabled to execute the additional attempt of a task in speculation of an ongoing considerably slower execution of the corresponding task.

To maintain the data and state machine related to the execution of each task in a TaskSet, and to abstract out the selection procedure of schedulable tasks among various tasks in a TaskSet, Task Scheduler relies on a TaskSet Manager instance instantiated for each of the TaskSet submitted to the Task Scheduler. TaskSet Manager also notifies DAG scheduler for task completion and start events so that the DAG scheduler can take appropiate actions for the corresponding stage and plan further in context of the Spark Action DAG being executed by the DAG scheduler.

DAG Scheduler: DAG scheduler is the primary component responsible for executing a Spark Action. Spark Context hosts and initializes a DAG scheduler instance inside the driver thread. DAG scheduler creates stages in response to the submission of a Job, where a Job essentially represents an RDD execution plan (also called RDD DAG) corresponding to an action taken in a Spark application. Multiple Jobs could be submitted to the DAG scheduler if multiple actions are taken in a Spark application.

For each of the Job submitted to it, DAG scheduler creates one or more stages, builds a stage DAG (Directed Acyclic Graph) to list out the stage dependency graph, and then plan execution schedule for the created stages in accordance with stage DAG. For each of the stages, it builds a set of tasks (TaskSet) where each task computes a partition of targeted RDD. The TaskSet is then submitted to the Task Scheduler for execution.

Further, the DAG scheduler maintains various data points and execution state machine for each of the stages. Also, it gets notified by the TaskSet Manager (of Task Scheduler) for task start and task completion events. These events enable DAG scheduler to update various data points about a stage and track the completion status of the stage execution which could turn out to be a success, partial success, or failure. In the case of successful execution of stage, DAG scheduler plans for execution of further dependent stages. And, in case of partial success or complete failure, the DAG scheduler in certain scenarios may plan for another attempt for the corresponding stage execution.

In order to re-attempt a particular stage execution, DAG scheduler might have to first execute multiple previous stages (to re-compute the corresponding lost partitions belonging to the previous stages) as per the DAG before the actual re-attempt of the original stage is executed.

Finally, based on the execution status of all the stages, the DAG scheduler concludes the submitted Job status as success or failure.

In case of feedback or queries on this story, do write in the comments section. I hope, you would find it useful. Here is the link to other comprehensive stories on Apache Spark.

--

--

Ajay Gupta
The Startup

Leading Data Engineering Initiatives @ Jio, Apache Spark Specialist, Author, LinkedIn: https://www.linkedin.com/in/ajaywlan/