Making sense of Avro, Kafka, Schema Registry, and Spark Streaming

Ivan Miller
Aug 27 · 3 min read
name := "kafka-spark-streaming-example"

version := "0.1"

scalaVersion := "2.11.0"

val confluent = Seq("io.confluent" % "kafka-schema-registry-client" % "5.2.1",
"io.confluent" % "kafka-avro-serializer" % "5.2.1").map(_.excludeAll(
ExclusionRule(organization = "org.codehaus.jackson"),
ExclusionRule(organization = "com.fasterxml.jackson.core")))

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-sql_2.11" % "2.4.0",
"org.apache.spark" % "spark-streaming_2.11" % "2.4.0",
"org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.0",
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.4.0"
)
libraryDependencies ++= confluent
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object main extends App {

val sc = new SparkConf().setMaster("local[*]").setAppName("kafka-spark-tester")
//batch every second
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("kafka-spark-tester")
.getOrCreate()

//add settings for schema registry url, used to get deser
val schemaRegUrl = "http://your_schema_registry_here:8081"
val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
val topic = "test.topic"
//subscribe to kafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "your_kafka_servers_here:9092")
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()

//add confluent kafka avro deserializer, needed to read messages appropriately
val deserializer = new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]]

//needed to convert column select into Array[Bytes]
import spark.implicits._

val results = df.select(col("value").as[Array[Byte]]).map { rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val decoded = deser.deserialize(topic, rawBytes)
val recordId = decoded.get("id").asInstanceOf[org.apache.avro.util.Utf8].toString

//perform other transformations here!!
recordId

}


results.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()

}

Ivan Miller

Written by

Software Engineer, Adrenaline Seeker, Bootstrapper. https://creatorpool.net http://fitsugarjunkie.com

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade