Analytics Vidhya
Published in

Analytics Vidhya

Parse Confluent Kafka messages without Avro schema registry

Parse Binary Serializer

Use case

  • Confluent Kafka messages pushed to event hub
  • Use mirror maker to move the messaged
  • Confluent use’s binary Avro format to serialize
  • Schema registry id is also in built in the message payload

Requirements

  • Azure Account
  • Create a Event hub namespace standard version
  • Create a Event hub
  • Install Mirror maker 2 in the source Kafka
  • Get the SAS key from event hub
  • Configure mirror maker to send messages to event hub with SAS key
  • Select the topic to send
  • In Event hub each topic will create one event hub with same topic name from Kafka
  • Create Azure Databricks
  • Create a cluster with runtime 7.6
  • Create pyspark notebook

Code Part

  • Using Azure Databricks
  • Using python spark to parse the code
  • Load the imports
  • Create a function to separate confluent bits, like schema id and value
  • First 4 byte as schema id.
  • Removing that 6 byte then makes it easier to process since remaining are all Avro
  • 1 byte is confluent magic byte
  • 2–5 bytes are schema id value
%python
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import pyspark.sql.functions as fn

binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
%python

df1 = spark.read.format("avro").option("mode", "PERMISSIVE").option("header", "true").load("dbfs:/FileStore/shared_uploads/xxxx@xxxxx.com/part_00000_ae91d923_dcca_4690_xxxx_xxxxxxxxxx_c000.avro")
%python

display(df1)
  • Split the substring
  • Pull Schema ID
%python
df2 = df1 \
.withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)")) \
.withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
  • Now load he Avro schema file
  • Export the schema information from confluent Kafka registry
%python
from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/dbfs/FileStore/shared_uploads/xxxx@xxxxxxx.com/avro_####_schemaname.avsc", "r").read()
%python
import pyspark.sql.avro.functions
from pyspark.sql.avro.functions import from_avro, to_avro

# display(df2.select(from_avro('fixedValue, schema.toString()) as 'record))
output = df2\
.select(from_avro("fixedValue", jsonFormatSchema).alias("record")
%python
display(output.select("record"))
%python
df3 = output.select("record.*")
display(output.select("record.*"))
%python
output.select("record.*").count()

Originally published at https://github.com.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store