Making sense of Avro, Kafka, Schema Registry, and Spark Streaming

Ivan Miller
3 min readAug 27, 2019

--

Recently, I needed to help a team figure out how they could use spark streaming to consume from our Kafka cluster. At face value, this is very straightforward — spark streaming offers easy integration with kafka via their high level APIs.

At the company I work for, we use Apache Avro to structure our internal messages. Once you introduce a serialization format like Avro, things get a little harder. Fortunately, Spark started offering built-in support for avro structured data with their 2.4 release!

Unforunately, though, this doesn’t account for avro data encoded with Confluent’s Schema Registry offering. Ugh! That sucks, because this is a common way to encode/decode avro data in the scala/kafka ecosystem.

Let me attempt to show you how I was able to consume Schema-Registry avro with spark streaming!

First things first, this example includes technologies typical of a modern data platform. Scala, Kafka, Schema Registry, and Spark all make appearances here. Let’s get right into the examples. Since we’re using scala, here’s what your build.sbt should look like to include the necessary software dependencies:

name := "kafka-spark-streaming-example"

version := "0.1"

scalaVersion := "2.11.0"

val confluent = Seq("io.confluent" % "kafka-schema-registry-client" % "5.2.1",
"io.confluent" % "kafka-avro-serializer" % "5.2.1").map(_.excludeAll(
ExclusionRule(organization = "org.codehaus.jackson"),
ExclusionRule(organization = "com.fasterxml.jackson.core")))

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-sql_2.11" % "2.4.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.4.0",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.0",
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.4.0"
)
libraryDependencies ++= confluent

A few things to note:

we’re building with Scala 2.11, and pulling in the spark jars compiled against this version. We also need to exclude the org.codehaus.jackson and com.fasterxml.jackson.core jars, because these deps from confluent tend to mess with the versions of these jars required by spark (and pulled in by the spark jars themselves).

Now that we have our dependencies, here’s what our main.scala might look like to consume avro from kafka via spark.

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object main extends App {

val sc = new SparkConf().setMaster("local[*]").setAppName("kafka-spark-tester")
//batch every second
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("kafka-spark-tester")
.getOrCreate()

//add settings for schema registry url, used to get deser
val schemaRegUrl = "http://your_schema_registry_here:8081"
val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
val topic = "test.topic"
//subscribe to kafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "your_kafka_servers_here:9092")
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()

//add confluent kafka avro deserializer, needed to read messages appropriately
val deserializer = new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]]

//needed to convert column select into Array[Bytes]
import spark.implicits._

val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val decoded = deser.deserialize(topic, rawBytes)
val recordId = decoded.get("id").asInstanceOf[org.apache.avro.util.Utf8].toString

//perform other transformations here!!
recordId

}


results.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()

}

Code Explanation:

At a high level, we spin up a local spark instance, subscribe to a kafka topic and listen from the latest offset, and allow an instance of KafkaAvroDeserializer to do the magic of decoding the bytes streamed from kafka back into a GenericRecord — a type we can interact with much like a Map[String, Any]. We finally write results of selecting a field called “id” back to the console.

Things to keep in mind:

  1. Schema Registry adds a few bytes to a typical avro encoded message. You can read more about those bytes here. This is where a lot of the pain of reading schema-registry avro encodings is! If you use schema registry to encode data you’re storing, you’ll have to use an instance of their deserializer to read it out as well.
  2. You’ll need your own instance of kafka and schema registry running to run this example :)
  3. By convention, the KafkaAvroSerializer will attempt to find a schema with the same name as the topic you’re reading from, with “-value” appended to the end. This is configurable, consult the confluent docs to learn more.

That’s all for now. The gist here is that if you’re using confluents schema registry, it is highly recommended to use their provided serializers/deserializers. You can roll your own by hand, but you’re gonna have a bad time if you try ;)

--

--