Streaming evolving JSON from Kafka using Spark

ANAND R
2 min readAug 11, 2020

--

Background

Need for deserializing messages from schema as the schema evolves in a streaming pipeline with JSON

Search

Here are my finding

Schema Registry + Databricks = Direct integration to deserialize

Schema Registry + ABRiS + Avro = Direct integration to deserialize

Problem

It is straight forward to apply schema on messages coming from kafka if they are static in nature. Assume that there are changes e.g. new column etc, applying a static schema will miss out new columns without an indication. Seriously!!! yes

Sample Json

{'date':'2020-08-10','email':'a@b.com'}

Sample Schema

schema = StructType(
[StructField('date', StringType(), False),
StructField('email', StringType(), False)])

Sample code

from pyspark.sql import functions as F
df = df.withColumn("parsed_content",F.from_json("value",schema))

Simple, you can now query the fields as parsed_content.date etc. Alternatively flatten the struct type and save it as json or csv for downstream consumption.

However, this will just work fine even if there is an additional column in the payload without breaking. However, if you miss to identify the data could be lost if it exceeds kafka retention.

{'date':'2020-08-10','email':'a@b.com','phone':'123-123-1234'}

Solution

There is a trade off, but benefits overweigh. Use foreachBatch, this will allow to perform a function on a microbatch, however, the exactly once schematics need to be traded off, as spark is unaware of a transaction happening within the function. Not that all bad, spark still provides at least once schematics.

Given that it is not that hard, just track the epoch_id and make sure if you have already processed. Thats it.

def foreach_batch_function(df, epoch_id):
# Transform and write batchDF

if epoch_id not in store:
schema = function_identify_schema(df) df = df.withColumn("parsed_content",F.from_json("value",schema))
df.writeStream.foreachBatch(foreach_batch_function).start()

Conclusion

Please let me know your thoughts

--

--