Spark, Parallelising the Parallel Jobs

Ajith Shetty
Analytics Vidhya
Published in
5 min readJun 26, 2021
Photo by ArtisanalPhoto on Unsplash

What on earth does “Parallelising the parallel jobs” mean??

Without going in depth, On a layman term,

Spark creates the DAG or the Lineage based on the sequence we have created the RDD, applied transformations and actions.

It applies the Catalyst optimiser on the dataframe or dataset to tune your queries. but what it doesn’t do is, running your function in parallel to each other.

We always tend to think that the Spark is a framework which splits your jobs into tasks and stages and runs in parallel.
In a way it is 100% true. But not in the way what we are going to discuss below.

Lets say that I have 10 tables for which I need to apply the same function, eg. count, count the number of nulls, print the top rows, etc.

So in here If i submit the job for 10 tables will it run parallel, since these 10 tables are independent of each other ???

Spark is smart enough to figure out the dependency and run things parallel, isn’t it?

Not really.

Demo Time

Step 1: Initialise the Spark Session

Step 2: Create dummy tables with sample records

For this example, I have used kaggle dataset and created the table

https://www.kaggle.com/datasets

Step 3: Sample commands to test the data and count

Step 4: Create the Common Function with some transformation in it which will be applicable for all the given tables

Step 5: Execute the same function for the independent tables

Here you can see that the jobs have been triggered sequentially even though there is not dependency between each of them.

To confirm the same lets look at the SPARK UI

So the jobs have run sequentially.

Lets do the same exercise but in a loop

As expected, the jobs have run sequentially.

How we can let the Spark know that there is no dependency between the given 10 tables as per our example and run them in parallel.

Now comes the Fun Part

The Scala Concurrency

https://docs.scala-lang.org/overviews/scala-book/futures.html

As per the official document, here’s a description of Future from its Scaladoc:

“A Future represents a value which may or may not currently be available, but will be available at some point, or an exception if that value could not be made available.”

ExecutionContext

An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool.
A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute-method. A special purpose ExecutionContext may be synchronous but must only be passed to code that is explicitly safe to be run using a synchronously executing ExecutionContext.

Await

Await is what is used to ensure proper handling of blocking for Awaitable instances.
While occasionally useful, e.g. for testing, it is recommended that you avoid Await when possible in favor of callbacks and combinators like onComplete and use in for comprehensions. Await will block the thread on which it runs, and could cause performance and deadlock issues.

a. Await.ready

Await the “completed” state of an Awaitable.

b. Await.result

Await and return the result (of type T) of an Awaitable.
Params:
awaitable — the Awaitable to be awaited
atMost — maximum wait time, which may be negative (no waiting is done), Duration.Inf for unbounded waiting, or a finite positive duration
Returns:
the result value if awaitable is completed within the specific maximum wait time

Lets try the spark parallelism using Scala Concurrency

Let’s see what what we are doing here in the code block.

  1. [line 9] Initialise the execution context to define the number of threads we need to run for our job. In our example it’s the number of tables.
  2. [line 17] Create a future list.
  3. [line 18] And pass the table name list, for each of the table define the threadpool name and give the function name [line 21] which is supposed to be run.
  4. [line 24] Take the future list and run Await.result which will trigger all of the threads and shall wait for the maximum amount of time(Duration.inf) for it to come back with the result.
  5. [line 34] Finally close the execution context.

Let’s execute and look at the output.

Hell ya!! the jobs have been triggered parallel to each other.

We have created 5 thread pools to run the jobs in each of its own pool.

Check the log “Triggering parallel job for”, which got submitted at the same time 10:44:56

The job completed at the different time as the counts of the tables are different from another.

Lets verify the SPARK UI

Here we can see that it created 5 different pool(0 to 4) and ran the jobs parallel to each other.

Look at the submitted time to make sure that they got tirggered at the same time.

Confirming it one more time in the JOBS page.

Check from the bottom, we can see that all the jobs have been SUBMITTED at the same time.

Hope you have enjoyed this demo and explanation.

You may find the above Notebook(DBC file) in my github repo.

You may please post your feedback in the comment.

Ajith Shetty

BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.

Subscribe✉️ ||More blogs📝||Linked In📊||Profile Page📚||Git Repo👓

Subscribe to my: Weekly Newsletter Just Enough Data

--

--

Ajith Shetty
Analytics Vidhya

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/