Job Scheduling in Apache Spark

Badwaik Ojas
6 min readMay 7, 2023

To give you a context about what we would be talking about in this blog. A Spark application consists of a driver process, which is where the high-level Spark logic is written, and a series of executor processes that can be scattered across the nodes of a cluster. The Spark program itself runs in the driver node and sends some instructions to the executors. One Spark cluster can run several Spark applications concurrently. The applications are scheduled by the cluster manager and correspond to one SparkContext. Spark applications can, in turn, run multiple concurrent jobs.

In short Apache Spark job scheduling can be done at the Cluster level or at Spark Application level.

Cluster Level: Here a job refers to an multiple Spark Application and this refers to scheduling jobs on the same cluster based on the resources available in the cluster. Here each job requests for resources to the cluster manager for processing the job. Once the request is approved, these resources are locked and won’t be available for subsequent job until they are released.

Application Level: In Spark Action triggers a Job, In this case this is what we are talking about. There might be case where there are multiple jobs request that comes in, job scheduling refers to scheduling these jobs accordingly.

Job Scheduling at Cluster Level

Job Scheduling here boils down to Resource availability in Cluster. When you run a Spark Application on a cluster it is given a specified number of resources and these resources won’t be shared with other applications and if there are multiple users/applications find need to share the cluster then there are 2 defined ways in Spark of allocating and managing resources across users/application.

Static Allocation

In this a Spark Application is allocated a fixed number of resources in the cluster and these resources are booked for the time till the Application is running. These resources are released once the Application ends and is available for next Application to run. This approach is available in Spark’s Standalone, mesos(Note: won’t be talking about mesos in this blog) and YARN cluster manager.

There are different options available using which we can control the number of resource to be allocation for the spark application like for Spark’s Standalone mode by default the application running will try to consume all the resource available but this can be controlled using “spark.cores.max” to the value you find would be enough for the application(PS: try testing with different value before coming to some value) also use “spark.executor.memory” to set the executor memory. When this value is not set at that time it would use “spark.deploy.defaultCores” which is set to infinite meaning max available in the cluster. In YARN when submitting Spark Application we use parameters like num-executor, executor-memory, executor-cores to control resource per executor. When can set the same parameters using the conf with parameters like “spark.executor.memory”, “spark.executor.cores” and “spark.executor.instances”.

Dynamic Allocation

Dynamic resource allocation expands the functionality of static allocation. In dynamic allocation, executors are added and removed from a Spark application as needed, based on a set of heuristics for estimated resource requirement. This feature is useful when multiple application share the same resources in Spark Cluster. This feature is disabled by default. lookout below configuration to use when using Dynamic Allocation “spark.dynamicAllocation.enabled”, “spark.dynamicAllocation.minExecutors”, “spark.dynamicAllocation.maxExecutors”, “spark.dynamicAllocation.initialExecutors”.

Other important configs like “spark.shuffle.service.enabled”, “spark.dynamicAllocation.shuffleTracking.enabled”. The purpose of this configs(shuffle tracking or the external shuffle service) is to allow executors to be removed without deleting shuffle files written by them.

Add Request is made for additional executors when it has pending tasks waiting to be scheduled. This condition necessarily implies that the existing set of executors is insufficient to simultaneously saturate all tasks that have been submitted but not yet finished. Spark requests executors in rounds. configurations to lookout for is “spark.dynamicAllocation.schedulerBacklogTimeout” , “spark.dynamicAllocation.sustainedSchedulerBacklogTimeout”.

Remove Request is made for removing executors is much simpler. A Spark application removes an executor when it has been idle for more than “spark.dynamicAllocation.executorIdleTimeout” seconds.

There might be cases where in the you are running the Spark application with Dynamic allocation. Resources are added as there were pending tasks and after the tasks were completed these extra resources that were added were no longer needed and were removed. Now, what about the state that would have been stored in the executor, the shuffle files written to these executors, As if these states are lost they would need to be recomputed and this would be an issue.

Apache Spark has the solution for this where in you can use the configuration “spark.shuffle.service.enabled” when this is enabled the spark executor will fetch the shuffle files from the service instead of from each other. Shuffle state will be available beyond executors lifetime.

Now regrading the data which are cached and persisted. The default behavior is executors containing cached data are never removed. There is conf like “spark.dynamicAllocation.cachedExecutorIdleTimeout”, “spark.shuffle.service.fetch.rdd.enabled” that can be used for this issue. Spark will use the External service to fetch the persisted data. Future implementation it is thought that these cached data would be stored in the off-heap storage so as to make it available.

Job Scheduling at Application Level

Spark is capable of running multiple jobs in a application provided they are requested from different threads. In this case the resources are allocated by one of the above discussed process. A job in here refers to any Action in Spark. Sparks scheduler is also thread safe and supports application that server multiple request.

By default, Spark schedules jobs in first in, first out (FIFO) to queue jobs within a system. This means that the first job submitted will run in its entirety, getting priority on all the available resources. However, if a job doesn’t need the whole cluster, the next job may start. FIFO scheduling can be useful to ensure that space-intensive jobs are able to use the resources that they need. However, if you launch a job a few seconds behind a many-hour process, the FIFO strategy can be frustrating. Spark offers a fair scheduler, modeled after the Hadoop fair scheduler, to allow high-traffic clusters to share resources more evenly. The fair scheduler allocates the tasks from different jobs to the executors in a “round-robin fashion” (i.e., parsing out a few tasks to the executors from each job). With the fair scheduler, a short, small job can be launched before an earlier long-running job is completed. However, The process of distributing these tasks is done by the TaskScheduler and varies depending on whether the fair scheduler or FIFO scheduler is used.

Now talking about fair scheduler it ensures that jobs get a more even share of cluster resources. The Spark application then launches jobs in the order that their corresponding actions were called on the SparkContext.

The fair scheduler also supports putting jobs into pools and allocating different priority (weight) to those pools. Jobs within a pool are allocated the same number of resources, and the pools are allocated resources according to their weight. Using pools can be a good way to ensure that high-priority jobs or very expensive jobs are completed.

This feature is disabled by default. To enable the fair scheduler use “spark.scheduler.mode” conf and set it to “FAIR”.

The fair scheduler also supports grouping jobs into pools, and setting different scheduling options (e.g. weight) for each pool. This can be useful to create a “high-priority” pool for more important jobs.

Lookout for Spark documentation for configuring these pools.

Job scheduling comes in handy when working in sharing environment where we run multiple spark application on the same cluster or run multiple request comes in for a single application it helps us use the resources intelligently without affecting the end execution.

With this we come to an end to this topic. Hope this helps.

Thanks for reading!!

--

--

Badwaik Ojas

#Certified Databricks Spark Developer #AWS Certified #BigData Enthusiast.