Analytics Vidhya
Published in

Analytics Vidhya

Parse Confluent Kafka messages without Avro schema registry

Parse Binary Serializer

Use case

  • Confluent Kafka messages pushed to event hub
  • Use mirror maker to move the messaged
  • Confluent use’s binary Avro format to serialize
  • Schema registry id is also in built in the message payload


  • Azure Account
  • Create a Event hub namespace standard version
  • Create a Event hub
  • Install Mirror maker 2 in the source Kafka
  • Get the SAS key from event hub
  • Configure mirror maker to send messages to event hub with SAS key
  • Select the topic to send
  • In Event hub each topic will create one event hub with same topic name from Kafka
  • Create Azure Databricks
  • Create a cluster with runtime 7.6
  • Create pyspark notebook

Code Part

  • Using Azure Databricks
  • Using python spark to parse the code
  • Load the imports
  • Create a function to separate confluent bits, like schema id and value
  • First 4 byte as schema id.
  • Removing that 6 byte then makes it easier to process since remaining are all Avro
  • 1 byte is confluent magic byte
  • 2–5 bytes are schema id value
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pyspark.sql.functions as fn

binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())

df1 ="avro").option("mode", "PERMISSIVE").option("header", "true").load("dbfs:/FileStore/shared_uploads/")

  • Split the substring
  • Pull Schema ID
df2 = df1 \
.withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)")) \
.withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
  • Now load he Avro schema file
  • Export the schema information from confluent Kafka registry
from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/dbfs/FileStore/shared_uploads/", "r").read()
import pyspark.sql.avro.functions
from pyspark.sql.avro.functions import from_avro, to_avro

# display('fixedValue, schema.toString()) as 'record))
output = df2\
.select(from_avro("fixedValue", jsonFormatSchema).alias("record")
df3 ="record.*")

Originally published at



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store