Apache Spark — Job monitoring

Use Spark Listeners to collect low level job metrics.

Hareesha Dandamudi
4 min readJun 9, 2022
Image source — https://kanbanize.com/

For the most part the primary reason why we want to use Spark is to accomplish optimal runtime performance from big data workloads. Given that is accepted motivation, how do we actually verify if performance is what is expected to be or see what is going on at basic level of task ? We can do that using Spark Listeners.

A Task is unit of work that happens on a partition. If any partition is way too big or too small than the other partitions which belong to same stage, then what would happen ? Entire stage may get delayed, out of memory errors could occur because of that one / few staggering tasks that has to deal with huge data in partitions. The idea of this article to articulate on ways to capture such information programmatically.

Spark has many counters that it uses during job execution. Each of those counters track certain component in the job, like task time, JVM’s garbage collection time, number of records read by each task, number of records written, executor runtime, serialization time and many more.

All of these can be used for debugging performance and further to generate alerts for off looking tasks . This information, sometimes is extremely crucial to understand the presence of staggering tasks that could cause data skew that could derail performance.

While job is in progress, we can get that information from the Spark WebUI. However, that is not the only way(not convenient way), we can get all of that information programmatically as well. In order to gather this data programmatically, register custom SparkListener to the SparkContext. Spark framework libraries also hosts other listeners a.k.a callbacks to collect different metrics.

SparkListener class is direct implementation of SparkListenerInterface and it is low level way of monitoring, which is what spark UI also uses. The lowest granularity of monitoring is at task level in addition to application, job and stage levels. SparkListener intercepts the events from the spark scheduler during execution of Spark application. Writing a custom listener by extending SparkListener class is straightforward in Scala or Java, while there is no direct way or any library from pyspark to do this.

However, the approach that we can use to add custom SparkListener to PySpark job or Scala spark job is exactly the same. A part from the below, rest everything is totally the information we want to capture and that goes into the listener itself. We can extend this functionality to publish metrics also.

  1. Write custom listener in Scala / Java and create a jar out of it.

2. Add the jar from the above step to Spark.driver.extraClassPath and Spark.executor.extraClassPath in PySpark or spark with scala (java) app’s SparkConf or default configurations.

3. Add the fully qualified listener class name to spark.extraListeners config property.

SparkListener class : As an example I have used java to write the listener, it is no different in Scala except for language nuances. For illustration purpose, I have printed data to console, in productionalized code we can store data in persistent sink.

Start shell with below parameters.

spark-shell --conf "spark.executor.extraClassPath=/path/sparkexamples/target/spark-1.0-SNAPSHOT.jar" --conf "spark.driver.extraClassPath=/path/sparkexamples/target/spark-1.0-SNAPSHOT.jar" --conf "spark.extraListeners=com.example.spark.SparkAppCustomListener"
Sample code — Custom class that extends SparkListener
Spark shell that shows — Application Id and userId — Application start handle
Spark shell — onApplicationEnd callback.

This comes handy when we have long running jobs and jobs that are memory intensive.

We can get all of these available metrics ourselves or we can simplify this by , triggering an API call to to get these, which is more developer friendly. There are set of APIs that exposed on Spark history server endpoint which needs applicationId as an input.

  1. Write a custom listener class. Add it to the spark listeners(as shown above).
  2. OnApplicationEnd handle, get application Id and call web service(http://<server-url>:18080/api/v1) on history server.
  3. Build on top of JSON response (all kinds of metrics as discussed above)
  4. We can create monitoring system around this to have better insight.

It can serve as an elegant, hands off, in house spark job monitoring system.

Spark SQL Query Listener : One level above the task metrics, we can also get query level metrics by implementing QueryExecutionListener from org.apache.spark.sql.util package. There are only two abstract methods that need implementation. The custom class written can be added programatically to SparkConf or using configuration

spark.sql.queryExecutionListener

QueryListener sample screenshot

Final thoughts :

The benefits of writing custom listeners is that, it doesn’t introduce additional complexity to existing functional code or interfere with execution times. All we are required to do, to get this already being recorded metadata is, write listeners, build once and then add to configuration.

References:

https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html

--

--