Beginner’s Guide to Spark UI: How to Monitor and Analyze Spark Jobs

Suffyan Asad
14 min readJun 4, 2023

--

Introduction

This article covers Spark UI, and tracking jobs using the Spark UI. The Spark UI is a very useful tool built into Apache Spark that provides a comprehensive overview of the Spark environment, nodes and executors, environment properties and parameters, jobs that are running, jobs that have completed/terminated or failed, query plans and much more.

Photo by Kevin Ku on Unsplash

Official Documentation

The official Spark UI documentation provides a decent introduction of the features of Spark UI:

First look at the Spark UI

Spark UI can be accessed in development mode at localhost:4040 by default. Lets quickly start a new project, start a Spark session, run a simple job, and view the Spark UI:

Simple job code to run and examine the Spark UI

Additionally, I am running this in PyCharm IDE, I have added a requirements.txt file as well with only one dependency:

pyspark==3.4.0

The Spark Session will create a Spark session in local mode, and the configuration local[4] means that 4 cores will be used. In local mode, the tasks run on the driver node.

Running this code will create a simple in-memory data frame, and then perform a group by operation on it before displaying it. The created DataFrame is cache’d as well. After completing this simple job, the time.sleep(100000) keeps Spark session alive for longer to enable examining the Spark UI.

To access the Spark UI, while the code is running, open a browser, and navigate to localhost:4040 which is the default port for Spark UI. Lets examine the Spark UI:

Spark UI Jobs View

Spark UI Job View

Note: If port 4040 is busy, the job run log will mention that it is starting on 4041 (or whatever port is available after 4040), so open that instead:

Choosing another port because 4040 is not available

Opening the Spark UI shows all the jobs that are complete and in progress. The execution time of our simple job is very small, hardly 10 seconds, therefore, only complete jobs and its stages are visible, however, in longer jobs, Spark UI shows in progress jobs as well. For each job, we can see the number of stages, and the time it took. Additionally, the UI also displays number of tasks, along with the number of successful tasks, failed tasks (if any), and tasks in progress (if any) as well.

The event timeline can be viewed by expanding the Event Timeline heading:

Event Timeline

The event timeline view:

Displays in chronological order the events related to the executors (added, removed) and the jobs.

Link: https://spark.apache.org/docs/latest/web-ui.html

This is how the Spark UI shows up in development mode, in cluster/client mode however, the Home page lists each submitted job, and selecting one of them opens this view, as you’ll see later.

Stages View

Next, clicking on Stages on the top menu opens the stages page:

Stages View showing stages for all jobs

Stages view shows additional information about input data size for each stage, shuffle read and write data size, and number of tasks. Additionally, skipped stages are also mentioned. Select stage 2 to show the details of this stage.

On top is the DAG Visualization heading, which can be expanded by clicking to reveal the DAG Visualization of the stage:

DAG Visualization of the stage

The DAG visualization shows steps breakdown of each stage. Below the DAG Visualization is the Event Timeline, which shows time taken by each task to complete, or time spent so-far by ongoing tasks. Event Timeline graph is also collapsed by default and needs to be exapanded.

Event timeline

The event timeline graph has an overall timeline on the x-axis, which provides an overall representation of start times, end times and duration of all the tasks compared to each other.

If a stage has more tasks than the number of tasks that can be executed at a time, some tasks may get started after capacity becomes available. All of this can be examined in the timeline view. The timeline of each task is broken down by computation time and time taken by other administrative tasks such as serialization/de-serialization, shuffle read/write etc.

Here is what the timeline looks like for a bigger job with many tasks:

A busy timeline — many tasks on multiple executors

Below the timeline, there is summary statistics, showing Duration, Garbage Collection time, input and shuffle times, and a few other elapsed times:

The breakdown of each time duration is by minimum, maximum, and 25th, Median and 75th percentiles. This table is quite handy in detecting staggering tasks and data skew. Ideally, all time durations and data sizes should be close to uniform for all tasks, i.e. as close to Median as possible.

A few tasks taking very long can be an indication of data skew or processing taking very long for some other reason such as complexity of calculations.

Next is the table that shows details of each task:

Task details

Here, information such as time taken by each task to complete, which node it ran on, GC time, input size, and shuffle size is available. Also, if available, links to the console outputs for each task, both stderr and stdout are also present and can be used to assess the outputs.

SQL/DataFrame View

The SQL/DataFrame view can be accessed by selecting the SQL/DataFrame button on the top bar. It is the last one on the top menu.

SQL Queries of the job

The main view shows each query in the overall job, and its duration, as well as the Job IDs associated with each query. First, select the query with ID 0 to reveal its details:

DAG Representation of the first query

This is the SQL Query view of the first action:

First action of the job — showing raw DataFrame

Lets read through the steps.

First, there is the Scan Existing RDD step, which creates the DataFrame by reading from a list of values. The number of output rows is 10. Next, the DataFrame is cached, and is read from the memory by in-memory Table Scan step. But because this is the first action, the DataFrame has to be constructed/read before it can be cached, and therefore, the first step gets executed and not skipped. Next are the projection and collection steps (Project and Collect Limit) which select and show the DataFrame.

Expand the Details heading to see the Query plan. The Physical plan is:

Physical Plan

Next, open the second SQL Query from the previous page:

DAG Representation of the second query

This is a longer query because the final output requires calculating a derived column, grouping on that column, and then aggregating the “letter” column.

But the thing I want to focus on is in the start of the query. The Scan Existing RDD step doesn’t return any row, and this is because the data for this step can be re-used from cache, and cache was populated in the previous query.

But because the cache operation doesn’t truncate data lineage, and it is possible (due to data loss or node getting taken away in a public cloud environment), some parts of cached data might have to be re-constructed, and if that happens, it’ll also be visible here in the query plan.

However, if checkpoint is used instead of cache, there is another SQL Plan added because Checkpoint is an action:

Queries if checkpoint instead of cache is used

The query plan of the checkpoint task is simple:

Checkpoint step plan

Both subsequent tasks now read data from the checkpoint instead of reading from the source:

DAG Representation of the first query — reading data from checkpoint
DAG Representation of the second query — reading data from checkpoint

If checkpoint data is lost, it cannot be recovered because the data lineage is truncated, and the job will fail.

Storage View

Talking about cache, the storage view (select Storage from the top menu) keeps all data that is persisted. In this case it is the DataFrame that was cached:

Cached Data in Storage View

Additionally, size of persisted data in memory and on disk is also displayed. Each stored item can be clicked on to open the detailed view:

Details of cached data

Environment view

Finally, the environment view shows the configured environment:

Spark execution environment

Executors view

The executors view displays all executors, as well as a summary breakdown. This view shows the number of active, completed and failed tasks for each executor instance, as well as the Garbage Collection (GC) time, input and shuffle data sizes, and, if available, logs of each executor. It can be accessed by selecting Executors button from the top navigation bar:

Executors view. Only Driver is present because of local mode

Examining a complex job — 2 node cluster running on docker

Now with the basics covered, lets look at something closer to reality: a multi-node cluster and a longer, and a more complex job.

Running a 2 node Spark cluster with Docker Compose

To run a 2 node cluster, I have found an excellent resource:

This article goes into the details of creating a multi-node Spark cluster using Docker and Docker Compose. Here is the associated code repository:

However, I have forked this repo and made the following changes:

  • Updated Spark version to 3.4.0
  • Removed the last database container
  • Changed the Spark Web UI port to 9000
  • Exposed the port 4040 on master
  • Some minor port change and other changes.
  • Increased worker memory to 2GB and worker cores to 2.

The updated repository will be referred to in this article, and can be obtained from the following link:

The cluster can be built by first building the docker file, and then running docker compose up:

docker build -t spark-docker-image-3.4.0 .
docker compose up
Starting 2 node Spark cluster using docker-compose

This sets up the cluster with 1 master node and 2 worker nodes:

Cluster nodes started successfully

The Spark Master Web UI can be accessed on port 9000. This is not the Spark UI for the job, it lists all jobs that executed or are in progress. A running job’s Spark UI can be accessed on localhost:4040 or by clicking the link from this list.

Spark Master Web UI — displaying all worker nodes and jobs submitted on the cluster

Note the IP addresses of the worker nodes. On my computer, worker 1 is 172.20.0.3 and worker 2 is 172.20.0.4. This will be needed to access the executor and task logs on worker nodes.

Also, the Spark master is spark://c44e0ffa8c82:7077, this will be passed to the spark-submit command.

Preparing a job for execution and examination

The job for demonstration does the following:

  1. Creates a DataFrame with two columns, numbers and letters.
  2. Each number is mapped to a letter, and the mapping can be changed to produce data skew. Larger the number passed to get_alphabet_skewed function, the bigger the skew on letters x, y and z.
  3. A User Defined Function (UDF) that repeats the passed letter 4 times. It can also fail (once in 500,000 times) if a parameter is set to True.

The code for the job is below:

Code of the more complex job

This code can be obtained from the following GitHib repository:

Note that in the create_spark_session, when preparing a session to run on the cluster, there is no number of cores configuration present. This is because these configurations are to be picked up from the environment defaults, or can be provided with the spark-submit command when submitting the job. We’ll see this shortly.

To submit the job, first, ssh to the master node of the running cluster:

docker exec -it docker-spark-cluster-spark-master-1 /bin/bash

Next, the folder /path_to_repository/apps in the repository has been mapped to /opt/spark-apps in the master node. Copy the bigger_job.py inside the folder, and verify that it can be accessed in the master node:

ls -lh /opt/spark-apps/
Python file of the job available on the master node of the Spark cluster

Now, lets execute the bigger job with the following spark-submit command:

/opt/spark/bin/spark-submit --deploy-mode client --master spark://c44e0ffa8c82:7077 --conf "spark.sql.shuffle.partitions=201" /opt/spark-apps/bigger_job.py

And then check the logs. Once the Spark UI starts up, it can be accessed at localhost:4040.

The job shows up as in progress job in the Spark Master Web UI (localhost:9000):

Job in progress — visible on the Spark Master’s Web UI

And it can be seen as running in the Spark UI:

Job in progress as seen in the Spark UI

The job completes after a few minutes:

Final output of the job

Accessing executor and task logs

After the job completes, we know that the User Defined Function prints on the console, but unlike the local mode, this is not visible on the console. This is because unlike local mode, the executors are on different instances, and therefore, their output can be accessed from their respective instances’ and respective task’s/executor’s stderr and stdout.

Access the job 6 in the Jobs tab of the Spark UI:

Job View in Spark UI

Next, select the only stage to view its details:

Job 6 details in Spark UI
Summary of job 6 in Spark UI — with task-wise breakdown

Each task has links to stderr and stdout in the Logs column. Pick a random task and select stderr, you will notice that it cannot be accessed:

Error accessing logs on the worker node

Remember that earlier, we determined that 172.20.0.4 is worker 2, and worker 2’s port 9000 has been mapped to 9002. So change the 172.20.0.4:9000 part to localhost:9002 while keeping the rest same to access the logs:

Task logs on the worker node

The logs can now be accessed. This can be repeated for all tasks.

To access the executor logs, go to the executors page, there is a link to stderr and stdout for each executor as well:

Executor statistics in Spark UI

These can be used to access the executor logs as well.

Examining logs of failed tasks

In the UDF function repeat_letter, there is a parameter called should_randomly_fail. Setting it to True fails the job with a very low probability of 1 in 5000000. So set it to true where it is being called:

Adding random failure to the Spark Job

Also, lets run the job a little differently, with the following executor properties:

Updated parameters

This can be achieved by passing the following configurations in the spark-submit call:

/opt/spark/bin/spark-submit --deploy-mode client --master spark://c44e0ffa8c82:7077 --conf "spark.executor.cores=1" --conf "spark.executor.memory=820M" --conf "spark.task.maxFailures=1000" /opt/spark-apps/bigger_job.py

Also, because we have 2 worker nodes, each with 2 cores and 2GB RAM, this configuration will run the job with 4 executors.

Lets stop the previous job and execute the updated job. Before running, copy the bigger_job.py file to the apps folder in the spark on docker repository (/path_to_repository/apps).

Open the environment to verify that the passed settings have been configured for the job:

Parameters passed for the job have been successfully configured and are visible in the Environment view of the Spark UI

Additionally, the Executors tab now shows 4 executors, two on each worker node:

Job running on 4 executors

Next, in the Jobs log, we can see that there are 5 failures. The overall job has not failed because it is less than 1000, the number of failures before Spark fails the entire job:

5 failed tasks in Job 6
Examining exception in one of the failed tasks

We can see the exception in the Errors column.

Identifying Data Skew

Now lets create data skew, and also turn off the random task failure:

Added data skew

And after the job is complete, lets examine Job 6’s task executions:

Some jobs taking longer due to data skew

There is skew in some tasks, despite the job running in Spark 3.4.0 with Adaptive Query Execution turned on. This is minor but it is present. This is how the Spark UI can be used to identify data skew. Please read my previous article about this, which covers detecting and mitigating data skew:

Summary

. In conclusion:

  • The Spark UI is a web-based interface that provides a detailed view of Spark applications, tasks, and query plans.
  • It lists all jobs that executed or are in progress, and provides access to their metrics and logs.
  • A running job’s Spark UI can be accessed on localhost:4040, unless configured differently.
  • The information provided by the Spark UI can can be used to analyze job performance, and identify needed fixes.
  • The Spark UI can also be used to examine node and task logs of failed jobs/tasks to identify the cause of failure.

Thanks for reading, this barely scratches the surface of both Spark UI, and how it can be used. I hope I have built an introductory understanding of the Spark UI, and the information it provides.

Further Reading — Spark History Server in Amazon Elastic MapReduce (EMR)

AWS EMR Spark job logs and environment details can be accessed using the Spark History Server:

Additionally, Ganglia can be installed on EMR clusters to view application metrics:

--

--

Suffyan Asad

Data Engineer | Passionate about data processing at scale | Fulbright and George Washington University alum | https://pk.linkedin.com/in/suffyan-asad-421711126