How to start Spark Structured Streaming by a specific Kafka timestamp

zeev
4 min readJul 28, 2020

--

Photo by Jeremy Thomas on Unsplash

“Sometimes in life, a sudden situation, a moment in time, alters your whole life, forever changes the road ahead.” by Ahmad Ardalan. Spark structured streaming pipelines from Kafka are no different, one day you will have to alter or start a completely new streaming job from a specific Kafka point that can be defined by the earliest, latest or particular offsets. In this blog post I’m going to illustrate three options for how to process Kafka events by a particular timestamp using Spark>2.x, Kafka0.10, scala, and python.

What problem are we trying to solve?

imagine a scenario where you have a spark structured streaming application which reads data from Kafka topic(s), and you encounter the following:

  • You have modified the streaming source job configs, such as changing maxOffsetsPerTrigger, Trigger time, WaterMark, and etc.
  • You have upgraded your application, and you want to discard the previous states.
  • You have found a bug in your code, and want to re-run it from a specific timestamp.
  • Someone by accident(or not😥) ingested corrupt or mixed records to one of the topics.
  • You simply want to start a streaming job from a specific timestamp.

In all of the above cases we wouldn’t be able to simply restart or start a spark streaming job from an existing checkpoint, and would have to create a new checkpoint. Therefore, our goal is to enable smooth and consistent possessing of Kafka offsets by a particular Kafka timestamp(s).

Spark structured streaming is all about the checkpoint and offsets

To understand Kafka, please go visit the official Kafka documentation, in short, Kafka events are stored into topics, each topic is divided into partitions. Each record in a partition has an offset(sequential number that defines the order within the partition).

in layman terms as it relates to Kafka and structured streaming; Every structured streaming job in Spark > 2.x is relying on some type of a checkpoint(a place to store information to recover from failures) to deliver either exactly-once message delivery(micro batches) or at least once delivery(continuous). Every Spark Structured Streaming trigger(Query) will save offsets to offset directory in the checkpoint location (defined using checkpointLocation option or spark.sql.streaming.checkpointLocation), which StreamExecution(Driver) checks and computes what offsets have been processed already and consumes new records by these offsets and other configs(such as maxOffsetsPerTrigger).

Solution #1 — get recent offsets based on an existing checkpoint

Given an existing struct streaming job, where you want to upgrade and restart the job from a very recent timestamp, you can simply retrieve it based on the old checkpoint:

Get the recent offsets in your checkpoint, if in HDFS, you can run

Hdfs dfs -ls /checkpointLocation/offsets
output:
... 906 2020-07-27 12:00 /checkpointLocation/offsets/295
... 906 2020-07-27 12:05 /checkpointLocation/offsets/296
... 906 2020-07-27 12:10 /checkpointLocation/offsets/297
... 906 2020-07-27 12:15 /checkpointLocation/offsets/298
... 906 2020-07-27 12:20 /checkpointLocation/offsets/299
... 906 2020-07-27 12:25 /checkpointLocation/offsets/300
... 906 2020-07-27 12:30 /checkpointLocation/offsets/301
...

Where the largest offset will be the more recent offset the application has check-pointed (intervals based on trigger and execution time). Get the offset you want and output it:

hdfs dfs -cat /checkpointLocation/offsets/300 
output:
{"batchWatermarkMs":..,"batchTimestampMs":.. ,"conf":{... ","spark.sql.shuffle.partitions":"200"}}
{"kafka-topic-one":{"2":491437,"4":537712,"1":561234,"3":554189,"0":548887}}

Finally just copy the offsets to the startingOffsets option

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "your-kafka-brokers:9092")
.option("subscribe", "kafka-topic-one")
.option("startingOffsets", """{"kafka-topic-one":{"2":491437,"4":537712,"1":561234,"3":554189,"0":548887}}""")
.load()

Solution #2 — get the offsets by a timestamp programmatically

To get earliest offset whose timestamp is greater than or equal to the given timestamp in the topic partitions, we can programmatically retrieve it:

In this example I used the Kafka-Python library to output the min offset whose timestamp is greater than our timestamp, using the method consumer.offsets_for_times.

Given our timestamp(milliseconds), we iterate through our topics and set timestamp(milliseconds) for each partition in the topic, thereafter we simply call the method ‘offsets_for_times’ to get the offsets, and eventually output the json results.

output:
{"kafka-topic-one": {"0": 780129, "1": 800387, "2": 903633, "3": 890202, "4": 826074}}

Solution #3 — just upgrade to Spark 3.x :)

we can always upgrade the Spark version to 3.x and similarly to solution#2, we can use the built-in implementation startingOffsetsByTimestamp, where you set a timestamp(milliseconds) for each TopicPartition :

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "your-kafka-brokers:9092")
.option("subscribe", "kafka-topic-one")
.option("startingOffsetsByTimestamp", """{"kafka-topic-one":{"0": 1595882598000, "1": 1595882598000, "2": 1595882598000, "3": 1595882598000, "4": 1595882598000}}""")
.load()

Footnotes

If you’re interested in learning more and explore other cool stuff about Spark, Kafka, HBase, MongoDB, Scala, and other big data technologies, be sure to check out my other blogs.

--

--

zeev

Big data enthusiast who likes to build cool stuff. If you are reading this part, I’m doing something right😎