Integrating Confluent Schema Registry with Apache Spark applications

Dominik Liebler
YAZIO Engineering
Published in
5 min readJan 24, 2022

At YAZIO, we believe in making decisions backed by data to help people live healthier lives through better nutrition. For each new and existing feature we want to evaluate how well it performs and how our users interact with it. In order to do so, we need a lot of data and we need to handle backpressure in our systems.

To cope with that we use a Kafka cluster managed by Strimzi operators running in Kubernetes. The data itself is being ingested from our mobile and web apps via HTTP or TCP endpoints serialized into JSON and stored in Kafka by a small application written in Kotlin/JVM.

Overview of our data pipeline architecture

At the other end of the pipeline, different Spark Structured Streaming applications (also written in Kotlin) dump this information into our data lake residing in a Ceph bucket. They read data from Kafka, deserialize it, transform some of the fields and write Parquet files into the data lake using a new schema.

Why schemas?

Schemas play an important role in data pipelines because they give meaning and context to data. In a world without schemas we would still do random interpretations about the context and meaning of data every now and then when using it. As you might have guessed already this would lead to a lot of bugs and misunderstandings.

Photo by EJ Strat https://unsplash.com/photos/VjWi56AWQ9k

Similar to a legal contract that binds you to certain limits, a schema binds the data to certain limits and meaning which narrow down the need of interpretation.

Choice of serialization formats

At the time of writing, Confluent Schema Registry supports these three serialization formats:

From those choices, only two really provide more than just validation of the data that is ingested and transmitted through our data pipelines. Avro and Protobuf also allow us to shrink the sizes of our topics because only the payload is contained in a message, while the repeating schema will not be stored. In the case of Avro this shrinks a record to about 30% of its original size!

So to reduce message size (and Kafka topic size in sum) we wanted to switch from JSON with an implicit schema to Apache Avro with an explicit but externally stored schema to ensure all records in a topic can be read using the same schema but at the same time offloading recurring information into the registry.

All this is impacting performance very little as all the used schemas are being cached in both the producer and consumer and will only be fetched once as long as the application is running. The only overhead is serialization itself, but this has been the case before anyway — after all JSON needs to be (de-)serialized as well.

Using the schema registry with Kotlin consumers

Integrating the schema registry in a Kotlin consumer isn’t that big of a deal as Confluent provides deserializers for JVM-based languages in a Maven package that you can just use and configure to use your schema registry instance.

As you can see, we only instruct the Consumer to use the respective serializers with the key.serializer and value.serializer config parameters, while all the magic then happens behind the scenes. If you generate Java classes from the Avro schemas, you can use specific types on the generic KafkaConsumer<K, V> class otherwise you have to use the SpecificRecord interface provided by the Avro library.

Spark, however, is a bit different here. To process data efficiently, Spark uses the Catalyst optimizer to leverage optimization strategies and distribute work load across a cluster of workers. A part of that is the Catalog which also contains the schema of data being read and written.

Long story short: Structured Streaming doesn’t support schemas being fetched from a registry out of the box and integration is tricky due to the fact that Spark code simply does not run sequentially.

ABRiS — Avro Bridge for Spark

The ABRiS project solves this problem by deriving a catalyst schema from one provided by the registry.

Photo by David Martin https://unsplash.com/photos/p9vBVq_-nXY

To integrate it with multiple applications, we implemented a helper function that will foster reusability and also handles errors that might occur when setting up ABRiS and downloading schemas from the registry:

It will configure ABRiS to always download the latest version of a reader schema to deserialize the data from Kafka for a given topic and uses the topic name strategy to construct the subject name for it.

Then we plug that into our Spark session like this:

This will download the schema once on startup and then begin to consume from the cats topic and write Parquet files with a schema derived from the registry.

Change is inevitable: schema evolutions

As we all know very well, software changes over time and this also applies to data pipelines of course. So, what happens when the schema needs to be changed?

Photo by Chris Lawton https://unsplash.com/photos/5IHz5WhosQE

First of all, you have to make sure that schema changes won’t break the whole pipeline on deploy. By default the consumer will just automatically register new schema versions with the registry when a new version has been deployed and data using the new format is about to be produced by it.

This can be dangerous as the downstream applications may not be compatible yet with the new format, especially if fields have been removed but the consumer still depends on them. For better resiliency, an explicit compatibility level should be set in the registry to enable backward and forward compatibility with schemas. This can be set both on a global as well as on a per-subject level. Then you can validate schema evolutions before they go in production, e.g. by using schema-registry-gitops in your CI/CD pipeline.

While this applies to all types of consumers, you should take special care when changing schemas that are used by Spark writers as they also will change the structure of the data being written into your data lake.

Drawing a conclusion

Overall our data pipelines benefited greatly from the switch from JSON to Avro. Not only did we reduce costs by means of more efficient storage usage, we also were able to lower complexity in our code by offloading schemas and removing custom serialization code.

Also: We are hiring! If you’d like to work on a well architected Nutrition & Health App in a fully remote position and integrate interesting technologies, we’d love to receive your application 😄

--

--

Dominik Liebler
YAZIO Engineering

Senior Data Engineer at YAZIO, likes nature, Kotlin, Rust, Apache Kafka, TypeScript and raisins in cheesecake