Making sense of Avro, Kafka, Schema Registry, and Spark Streaming

Ivan Miller
Aug 27 · 3 min read
name := "kafka-spark-streaming-example"

version := "0.1"

scalaVersion := "2.11.0"

val confluent = Seq("io.confluent" % "kafka-schema-registry-client" % "5.2.1",
"io.confluent" % "kafka-avro-serializer" % "5.2.1").map(_.excludeAll(
ExclusionRule(organization = "org.codehaus.jackson"),
ExclusionRule(organization = "com.fasterxml.jackson.core")))

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-sql_2.11" % "2.4.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.4.0",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.0",
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.4.0"
libraryDependencies ++= confluent
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object main extends App {

val sc = new SparkConf().setMaster("local[*]").setAppName("kafka-spark-tester")
//batch every second
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("kafka-spark-tester")

//add settings for schema registry url, used to get deser
val schemaRegUrl = "http://your_schema_registry_here:8081"
val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
val topic = "test.topic"
//subscribe to kafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "your_kafka_servers_here:9092")
.option("subscribe", topic)
.option("startingOffsets", "latest")

//add confluent kafka avro deserializer, needed to read messages appropriately
val deserializer = new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]]

//needed to convert column select into Array[Bytes]
import spark.implicits._

val results ="value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val decoded = deser.deserialize(topic, rawBytes)
val recordId = decoded.get("id").asInstanceOf[org.apache.avro.util.Utf8].toString

//perform other transformations here!!


.option("truncate", "false")


Ivan Miller

Written by

Software Engineer, Adrenaline Seeker, Bootstrapper.

