How Apache Spark runs our Application?
In order to understand how your application runs on a cluster, an important thing to know about Dataset/Dataframe transformations is that they fall into two types, narrow and wide, which we will discuss first, before explaining the execution model.
Dataframe is nothing but a Dataset[Row], so going forward we will generally use Dataset.
Narrow and Wide Transformations
As a review, transformations create a new Dataset from an existing one. Narrow transformations do not have to move data between partitions when creating a new Dataset from an existing one. Some example narrow transformations are “filter” and “select,” which are used in the example below to retrieve flight information for the carrier “AA”:
Multiple narrow transformations can be performed on a Dataset in memory, in a process called pipelining, making narrow transformations very efficient.
Wide transformations cause data to be moved between partitions when creating a new Dataset, in a process called the shuffle. With wide transformation shuffles, data is sent across the network to other nodes and written to disk, causing network and disk I/O, and making the shuffle a costly operation. Some example wide transformations are “groupBy,” “agg,” “sortBy,” and “orderBy.”
Below is a wide transformation to count the number of flights by carrier.
The Spark Execution Model
The Spark execution model can be defined in three phases: creating the logical plan, translating that into a physical plan, and then executing the tasks on a cluster.
You can view useful information about your Spark jobs in real time in a web browser with this URL: http://<driver-node>:4040.
For Spark applications that have finished, you can use the Spark history server to see this information in a web browser at this URL: http://<server-url>:18080.
The Three Phase Plan
Let’s walk through the three phases and the Spark UI information about the phases, with some sample code.
In the first phase, the logical plan is created. This is the plan that shows which steps will be executed when an action gets applied. Recall that when you apply a transformation on a Dataset, a new Dataset is created. When this happens, that new Dataset points back to the parent, resulting in a lineage or directed acyclic graph (DAG) for how Spark will execute these transformations.
Actions trigger the translation of the logical DAG into a physical execution plan. The Spark Catalyst query optimizer creates the physical execution plan for DataFrames.
The physical plan identifies resources, such as memory partitions and compute tasks, that will execute the plan.
Transformations only leads to creation of Logical plan (DAG), only Actions trigger creation of Physical Plan and so on.
Executing the Tasks on a Cluster
In the third phase, the tasks are scheduled and executed on the cluster. The scheduler splits the graph into stages, based on the transformations. The narrow transformations (transformations without data movement) will be grouped (pipe-lined) together into a single stage.
The physical plan for above example(read+select -> filter -> groupBy) has two stages, with everything before the exchange in the first stage.
Each stage is comprised of tasks, based on partitions of the Dataset, which will perform the same computation in parallel.
The scheduler submits the stage task set to the task scheduler, which launches tasks via a cluster manager(standalone/yarn/mesos). These phases are executed in order and the action is considered complete when the final phase in a job completes. This sequence can occur many times when new Datasets are created.
summary of the components of execution:
- Task: a unit of execution that runs on a single machine
- Stage: a group of tasks, based on partitions of the input data, which will perform the same computation in parallel
• Job: has one or more stages
• Pipelining: collapsing of Datasets into a single stage, when Dataset transformations can be computed without data movement
- DAG: Logical graph of Dataset operations
Exploring the Task Execution on the Web UI
Here is a screen shot of the web UI Jobs tab, after running the code above. The Jobs page gives you detailed execution information for active and recently completed Spark jobs.
It gives you the performance of a job and also the progress of running jobs, stages, and tasks. In this example, Job Id 2 is the job that was triggered by the following code. ( can be identified by collect in Description )
Clicking the link in the Description column on the Jobs page takes you to the Job Details page. This page gives you details on the progress of the job, stages, and tasks.
We see this job consists of 2 stages, with 2 tasks in the stage before the shuffle and 200 in the stage after the shuffle ( total 202 ).
Finding Number of partitions for a Dataset
The number of tasks correspond to the partitions: after reading the file in the first stage, there are 2 partitions; after a shuffle, the default number of partitions is 200. You can see the number of partitions on a Dataset with the rdd.partitions.size method shown below.
Under the Stages tab, you can see the details for a stage by clicking on its link in the description column.
Hope you liked this article, please follow myself and @codebrace
for more articles like this.