Spark Streaming Jobs: Monitoring and Alerting for Silent Failure Part 1

Arvind Pant
3 min readMar 9, 2024

--

Introduction: In the fast-paced world of data streaming, ensuring the reliability of your streaming jobs is crucial. However, there are times when jobs may appear to be running smoothly, yet fail silently without loading any data. These instances of silent failures can be detrimental to business operations if left undetected. In this first part of our series, we explore types of metrics and when to collect in your pipeline using Chronosphere, an observability tool that leverages Prometheus for integration and alerts, and Grafana for dashboard visualization.

Chronosphere Architecture: source chronosphere
source: chronospher website

Metrics Collection: To effectively monitor streaming pipelines, we need to gather relevant metrics that provide insights into job performance. Spark streaming provides listeners that capture essential data such as the number of rows received, processed, and duration. By incorporating listeners into our streaming jobs, we can collect these metrics in real-time. Here’s how to set up a listener in Spark streaming:

# Adding a custom listener to Spark streaming
spark.streams.addListener(ExStreamListener())

Next, we define a custom listener class, ExStreamListener, which extends the StreamingQueryListener interface. Within this class, we implement methods to capture various metrics during the query lifecycle, such as onQueryStarted, onQueryProgress, onQueryIdle, and onQueryTerminated. For example:

class ExStreamListener(StreamingQueryListener):
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)

Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.

Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
# Extract relevant metrics from the progress event
qp_batch_id = event.progress.batchId
qp_num_input_rows = event.progress.numInputRows
qp_input_rows_per_second = event.progress.inputRowsPerSecond
qp_processed_rows_per_second = event.progress.processedRowsPerSecond

def onQueryStarted(self, event):
"""
Called when a query is started.

Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.

Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""

def onQueryIdle(self, event):

def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.

Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""

These metrics, such as input rows, input rows per second, and processed rows per second, can then be forwarded to Chronosphere for monitoring and analysis.

Types of Metrics: Chronosphere supports various types of metrics that align with Prometheus concepts:

  1. Gauge: A single numerical value that can fluctuate over time.
  2. Counter: A cumulative metric that increases over time, possibly resetting to zero.
  3. Histogram: Observations counted within customizable buckets.
  4. Summary: Similar to a histogram but with additional features such as total count and sum of all observations.

Conclusion: By implementing listeners and collecting relevant metrics from Spark streaming jobs, we can gain valuable insights into job performance and detect potential issues such as silent failures. In the next part of this series, we will explore how to integrate Spark streaming pipelines with Chronosphere and set up dashboards and alerts for effective monitoring and alerting. Stay tuned for more insights!

Note: you can read more about metrics here https://prometheus.io/docs/concepts/metric_types/

Spark Streaming Listener here https://docs.databricks.com/en/structured-streaming/stream-monitoring.html

--

--

Arvind Pant

I'm passionate about continuous learning and sharing my insights with others. Writing allows me to offer guidance and learn from different perspectives.