Boosting Apache Spark Application by Running Multiple Parallel Jobs

Hari Viapak Garg
Analytics Vidhya
Published in
4 min readAug 4, 2020
Photo by Anthony Rao on Unsplash

There might be a question in your mind from the title of this article that Apache Spark already performs data processing in parallel, what’s new in it. If this is the case, please allow me to give an idea about spark job — It is a parallel computation which gets created once a spark action is invoked in an application. Apart from this, it is a known fact that by default, Apache Spark runs multiple tasks among each executor to achieve parallelism, however, it is not true at job level. In other words, once a spark action is invoked, a spark job comes into existence which consists of one or more stages and further these stages are broken down into numerous tasks which are worked upon by the executors in parallel. Hence, at a time, Spark runs multiple tasks in parallel but not multiple jobs.

WARNING: It does not mean spark cannot run concurrent jobs.

Through this article we will explore how we can boost our default spark application’s performance by running multiple jobs(spark actions) at once. I would also share Spark UI snippets which show that for the same amount of work, spark application with concurrent jobs took just one fourth of the default spark application’s time .

Note: In this article, I might have used word ‘job’ for ‘spark action’

From my personal experience, I have observed that some extraction applications consist of such actions or jobs which have nothing do with each other and they are totally independent DAGs.

To illustrate with a simple example:

  • We want to query 3 different tables and save their CSV outputs.( As it involves only one action i.e ‘save’ for each query, it means it would result in 3 jobs in spark UI.)

To implement the aforementioned statement, we have below options.

  • different Spark sessions for each query — very inefficient and costly idea.
  • same Spark Session and execute the queries in a loop i.e. a default nature of spark application.
  • same Spark Session and run the queries in parallel — very efficient as compared to the other two. Lets’s go ahead with it.

By running concurrent jobs with a single spark session, will not only maximise the resource utilisation but also reduce application time and cost drastically. Furthermore, if we have adequate resources and these jobs do not have any interlink between them then it does not make sense to execute them in a loop or as a different spark applications.

Now, we have understanding of ‘what and why’ of our goal, it is time visit how we can implement it.

To achieve concurrency at job level, we can leverage Scala concurrency features called Scala Futures. Its ExecutionContext is responsible for executing computations asynchronously.

In the below code, we have three queries and one function named ‘executeAndSave’. We are wrapping up this function’s call inside a Future block and calling Future function named ‘Await.result’ to wait the outcome of all queries. This way, we would be able to run multiple ‘save’ jobs in parallel and don’t forget to shutdown ExecutionContext in the end.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration,MINUTES}
import scala.concurrent.{Await, Future}

val pathPrefix="/lake/mutlithreading/"
val queries=Seq("SELECT * FROM ABC|output1","SELECT * FROM PQR|output2","SELECT * FROM XYZ|output3")
val futureArray: Array[Future[Unit]] = new Array[Future[Unit]](3)
var i=0
queries.foreach(queryAndPath => {
val query=queryAndPath.split("\\|")(0)
val dataPath=pathPrefix+queryAndPath.split("\\|")(1).trim
futureArray(i) = Future {
executeAndSave(query,dataPath)
}

i = i + 1
})
futureArray.map(s => Await.result(s, Duration(15, MINUTES)))

def executeAndSave(query:String,dataPath:String)(implicit context: Context):Unit = {
println(s"$query starts")
context.spark.sql(query).write.mode("overwrite").parquet(dataPath)
println(s"$query completes")
}

Apart from above solution, Pavel Filatov suggested in comments that it can be made more simple by using Parallel collections which provide implicit parallelism. I also believe, it is far more simple approach where we don’t have any requirement to control low-level parallelisation details. Please find below his solution .

https://gist.github.com/pavel-filatov/aaea22e304bfdb509866f13034df0d80

Time duration Analysis between default spark application and spark application with concurrency

Spark UI for sequential jobs

In the above snippet, we can see that default spark application took 17 seconds whereas spark application with concurrent jobs as shown in the below picture took only 4 seconds to accomplish same amount of work. It can also be seen that the submitted time for all three jobs is same. It could be confirmed from event timeline as well where jobs are overlapping each other. As Job Id 3 took 4 seconds which is maximum among other two jobs, hence we considered it as a total time for the work.

Spark UI for concurrent jobs

Running concurrent jobs in spark application bring positive results and boost performance in most of the cases , however, there could be a scenario when alone Scala Futures would not help, it is because sometimes a job consumes all the resources and other jobs have to wait until they get some of it. In this case, we need to configure Spark’s FAIR scheduling which will ensure resources to all the triggered jobs. I have covered this topic here - “Apache Spark: Sharing Fairly between Concurrent Jobs within an Application

I hope, this post has helped you.

Please share your thoughts and suggestions in comments.

--

--