Spark Parallel Job Execution

Tharun Kumar Sekar
Analytics Vidhya
Published in
3 min readSep 6, 2020

Spark is known for breaking down a big job and running individual tasks in parallel. But, this doesn’t mean it can run two independent jobs in parallel. This article will help you to maximize the parallelization that you can achieve from Spark.

Asynchronous Programming

This is a type of parallel programming in which a unit of work is allowed to run separately from the primary application thread. When the work is complete, it notifies the main thread about the completion or failure of the worker thread. In Scala, you can achieve this using Future.

Scala Futures

Futures are a means of performing asynchronous programming in Scala. A Future gives you a simple way to run a job inside your spark application concurrently.

Let’s look at the usual way we write our Spark code and then see how Future can help us.

val employee = spark.read.parquet("s3://****/employee")
val salary = spark.read.parquet("s3://****/salary")
val ratings = spark.read.parquet("s3://****/ratings")

println("Joining employee with salary")
employee.join(salary, Seq("employee_id"))
.exportToS3AndJSON("s3://****/employee_salary")

println("Joining employee with ratings")
employee.join(ratings, Seq("employee_id"))
.exportToS3AndJSON("s3://****/employee_ratings")

In the above code, we read 3 datasets — employee, salary and ratings.

  • In the first statement, we join tables Employee and Salary based on Employee_ID and we save down the result in parquet and JSON format.
  • In the second statement, we join tables Employee and Ratings based on Employee_ID and we save down the result again in parquet and JSON format.

The first and the second statement are in no way related to each other and yet Spark will run it sequentially. You would get a better picture of this, if you take a look at the picture of the Spark UI.

Spark UI

Job ID 0 — starts first and runs for 5.5 minutes and once the first job is completed, the second one is picked up and so on. You can deduce the same by looking at the event timeline too. None of the job overlaps and each job is picked up after the previous job is completed.

If the job 0 utilizes 50% of the cluster, the remaining 50% would be un-utilized.

Let’s understand how we can increase the utilization by using scala futures. Below is the same piece of code but with Future incorporated.

import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
//Allowing a maximum of 2 threads to run
val
executorService = Executors.newFixedThreadPool(2)
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
val employee = spark.read.parquet("s3://****/employee")
val salary = spark.read.parquet("s3://****/salary")
val ratings = spark.read.parquet("s3://****/ratings")
val futureA = Future {
println
("Joining employee with salary")
employee.join(salary, Seq("employee_id"))
.exportToS3AndJSON("s3://****/employee_salary")
println("Future A Complete")
}
val futureB = Future {
println("Joining employee with ratings")
employee.join(ratings, Seq("employee_id"))
.exportToS3AndJSON("s3://****/employee_ratings")
println("Future B Complete")
}
Await.result(futureA, Duration.inf)
Await.result(futureB, Duration.inf)

The changes include

  • Importing ExecutionContext to get access to the thread pool.
  • Defining the number of threads to run.
  • Enclosing the transformations inside a Future construct.
  • The Await.result method call declares that it will wait for the Future to get executed.

Let’s take a look at how the job performs now by looking at the Spark UI.

In here you could see that the jobs 0 and 1 have started at almost the same time. You can also see from the Event timeline, that both the jobs are running in parallel.

If you liked this article, click the 👏 so other people will see it here on Medium.

--

--