Guide to: StreamingQueryListener in PySpark Streaming

Shani Alisar
MyHeritage Engineering
5 min readJul 4, 2022

At MyHeritage, we use PySpark Streaming for several business-critical applications.

This article will discuss how we solved the issue of measuring the lag of Kafka-consuming PySpark Streaming applications.

Our problem

Our pipeline structure contains PySpark Streaming running in AWS EMR.
We use Prometheus as our monitoring application.

By default, PySpark doesn’t commit any offsets to Kafka, as Spark manages the offsets on its own:

enable.auto.commit: Kafka source doesn’t commit any offset. — docs

This means we are “blind” to the lag between Kafka producers and consumers.
If the rate of production of data far exceeds the rate at which it is getting consumed, consumer groups will exhibit lag which can cause the consumer to fall behind processing the data, for that we needed to find a workaround to monitor the lag of our Kafka consumers.

Alternatives we considered

  1. Create another application that would forward the checkpointing data to Prometheus.
    We could commit the Kafka offset from the checkpoints files straight to our monitoring application or back to Kafka with this approach.
    The problems with this approach are that we will need to maintain another application, we will need to map the checkpoints paths for each Spark Streaming application, and we will need to support different filesystems (HDFS/S3).
  2. Commit the offset from within the main application.
    We could commit the offset from the main application with the ability to conduct the same solution on all our Spark Streaming applications. The only potential issue with this approach is that it could cause discrepancies between Spark’s checkpoint data and Kafka’s commits. This is fine, as Spark ignores the Kafka offsets and uses its fault-tolerant mechanism (checkpoints).
    As an important side note, we also raised the option of using UDF to commit the offset for this alternative. The problem was that the only way to know a batch has finished is from the Spark master, not the executors.

Some background

Spark Kafka source store offsets in checkpoints — docs

A checkpoint helps build fault-tolerant and resilient Spark applications.
In Spark Structured Streaming, it maintains an intermediate state on HDFS/S3 compatible file systems to recover from failures.

Spark offset management

Image credit: this post on waitingforcode.com

StreamingQueryListener — docs

The StreamingQueryListener class is the contract of listeners that want to be notified about the life cycle events of streaming queries, i.e. start, progress, and termination of a batch.

StreamingQueryListener flow

onQueryProgress is the function that informs that MicroBatchExecution has finished the triggerExecution phase (the end of a streaming batch).

Our solution

StreamingQueryListener class sounds great, right?! We can use its functions and override them with our own logic after a batch has finished.

The problem with this approach is that this class is only importable from Java and Scala, and we use Python.

But then, we saw this SPOT-ON example in StackOverflow for “PySpark Structured Streaming with a Sink that produces output visible in Apache Zeppelin” using the StreamingQueryListener in PySpark.

We generalized this example for other usages — in our case, committing offsets to Kafka.

So how does this work?

The StreamingQueryListenerGateway Jar:

Project structure:

This class overrides Spark StreamingQueryListener functions onQueryStarted, onQueryTerminated, and onQueryProgress.

In the function onQueryProgress we added a call for the interface PythonObserver function on_next, which later on we will override in our Python project.

This interface does nothing in this jar.

In the following example, we will implement the PythonObserver interface in Python (StreamingObserver) to be inherited in a different class with the required logic (StreamingObserverCommit).

Now we need to add the Java listener to our Python:

Project structure:

We patched StreamingQueryManager.addListener by adding our jar class PythonStreamingQueryListener as a listener to our Spark session stream.

The add_stream_listener function gets the listener_class for argument. This way, we can pass a custom stream listener class inheriting from StreamingObserver class (i.e., class to commit Kafka topics — StreamingObserverCommit — which will see in the upcoming examples) to be added as a listener for the Spark stream.

This class implements the StreamingQueryListenerGateway jar PythonObserver interface.

This class abstracter is StreamingObserver class. This class on_next function commits Spark batch offset (which we get from the on_next parameter value = queryProgress from Java) to a different groupid.

And finally, we add our listener and start the streaming process.

How to deploy:

We added the jar we created with — jar flag as follows:

spark-submit — master yarn — packages org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.0 — jars StreamingQueryListenerGateway-1.0.jar main.py

Error Handling:

After we deployed these changes to production, we noticed while terminating and rerunning the pipeline on the same cluster that the Spark failed on this error:

ERROR:py4j.java_gateway:An error occurred while trying to start the callback server (127.0.0.1:25334)

OSError: [Errno 98] Address already in use

We needed to make sure that the Java gateway is closed on termination:

This function shut down the Java gateway.

And the function shutdown_java_gateway is called when the stream awaitAnyTermination raises an exception or exit.

Prometheus graph for lag monitoring:

Approximate lag of consumer groups on a topic/partition, i.e., the number of messages published but not consumed yet by this consumer group.

Query to execute:

sum by (topic,consumergroup) (kafka_consumergroup_lag{topic={SomeTopic}, consumergroup=”{GroupId}_offsets”})
Prometheus graph for kafka_consumergroup_lag metric

Summary

In conclusion, we can use the StreamingQueryListener class in the PySpark Streaming pipeline. This could also be applied to other Scala/Java-supported libraries for PySpark.

You could get the open-source code in this repo.

Don’t restrict this solution only to what we used it for, as it can be easily adjusted for any logic your business needs.

And never forget to close your Java gateway :)

--

--