Spark, Parallelising the Parallel Jobs
What on earth does “Parallelising the parallel jobs” mean??
Without going in depth, On a layman term,
Spark creates the DAG or the Lineage based on the sequence we have created the RDD, applied transformations and actions.
It applies the Catalyst optimiser on the dataframe or dataset to tune your queries. but what it doesn’t do is, running your function in parallel to each other.
We always tend to think that the Spark is a framework which splits your jobs into tasks and stages and runs in parallel.
In a way it is 100% true. But not in the way what we are going to discuss below.
Lets say that I have 10 tables for which I need to apply the same function, eg. count, count the number of nulls, print the top rows, etc.
So in here If i submit the job for 10 tables will it run parallel, since these 10 tables are independent of each other ???
Spark is smart enough to figure out the dependency and run things parallel, isn’t it?
Not really.
Demo Time
Step 1: Initialise the Spark Session
Step 2: Create dummy tables with sample records
For this example, I have used kaggle dataset and created the table
https://www.kaggle.com/datasets
Step 3: Sample commands to test the data and count
Step 4: Create the Common Function with some transformation in it which will be applicable for all the given tables
Step 5: Execute the same function for the independent tables
Here you can see that the jobs have been triggered sequentially even though there is not dependency between each of them.
To confirm the same lets look at the SPARK UI
So the jobs have run sequentially.
Lets do the same exercise but in a loop
As expected, the jobs have run sequentially.
How we can let the Spark know that there is no dependency between the given 10 tables as per our example and run them in parallel.
Now comes the Fun Part
The Scala Concurrency
https://docs.scala-lang.org/overviews/scala-book/futures.html
As per the official document, here’s a description of Future from its Scaladoc:
“A Future represents a value which may or may not currently be available, but will be available at some point, or an exception if that value could not be made available.”
ExecutionContext
An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool.
A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute-method. A special purpose ExecutionContext may be synchronous but must only be passed to code that is explicitly safe to be run using a synchronously executing ExecutionContext.
Await
Await is what is used to ensure proper handling of blocking for Awaitable instances.
While occasionally useful, e.g. for testing, it is recommended that you avoid Await when possible in favor of callbacks and combinators like onComplete and use in for comprehensions. Await will block the thread on which it runs, and could cause performance and deadlock issues.
a. Await.ready
Await the “completed” state of an Awaitable.
b. Await.result
Await and return the result (of type T) of an Awaitable.
Params:
awaitable — the Awaitable to be awaited
atMost — maximum wait time, which may be negative (no waiting is done), Duration.Inf for unbounded waiting, or a finite positive duration
Returns:
the result value if awaitable is completed within the specific maximum wait time
Lets try the spark parallelism using Scala Concurrency
Let’s see what what we are doing here in the code block.
- [line 9] Initialise the execution context to define the number of threads we need to run for our job. In our example it’s the number of tables.
- [line 17] Create a future list.
- [line 18] And pass the table name list, for each of the table define the threadpool name and give the function name [line 21] which is supposed to be run.
- [line 24] Take the future list and run Await.result which will trigger all of the threads and shall wait for the maximum amount of time(Duration.inf) for it to come back with the result.
- [line 34] Finally close the execution context.
Let’s execute and look at the output.
Hell ya!! the jobs have been triggered parallel to each other.
We have created 5 thread pools to run the jobs in each of its own pool.
Check the log “Triggering parallel job for”, which got submitted at the same time 10:44:56
The job completed at the different time as the counts of the tables are different from another.
Lets verify the SPARK UI
Here we can see that it created 5 different pool(0 to 4) and ran the jobs parallel to each other.
Look at the submitted time to make sure that they got tirggered at the same time.
Confirming it one more time in the JOBS page.
Check from the bottom, we can see that all the jobs have been SUBMITTED at the same time.
Hope you have enjoyed this demo and explanation.
You may find the above Notebook(DBC file) in my github repo.
You may please post your feedback in the comment.
Ajith Shetty
BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.
Subscribe✉️ ||More blogs📝||Linked In📊||Profile Page📚||Git Repo👓
Subscribe to my: Weekly Newsletter Just Enough Data