A Data Governance Tool for High-Volume Kafka Workloads

DV Engineering
DoubleVerify Engineering
6 min readDec 6, 2023

Written By: Andrew Malov

Collecting and analyzing data efficiently and accurately have always been core capabilities for DV. As data volumes continue to grow, data streaming platforms, like Kafka, have become an important part of our technology stack. Recently we’ve seen two dimensions of data growth:
— The number of messages, which is a natural external factor since we have more clients, activity and internet traffic; and
— The number of fields inside each message, since every product development team adds parameters to be gathered/inferred on the client side and taken all the way to our backend platform.

Our Kafka ingestion pipeline at DV handles AVRO messages with 700+ fields. Managing huge schemas has become a challenge because:

  1. Security officers want to limit access to sensitive fields.
  2. Consumers inside the company want different slices of raw data.
  3. Moving the messages downstream inside the company should be cost-effective
  4. Each team adds new data points during the enrichment phase.
  5. Teams producing and consuming messages may work at different release tempos.

Data governance is the pivotal way to solve security concerns that come with huge schema management. Companies have to set standards for how data is being gathered, stored and processed. Data governance defines policies like “who can access granular pieces of data and how”.

Here, we’ll see how the Confluent Schema Registry and AVRO open-source library helped us address particular data governance challenges. The technical solution proposed in this article allows for “schema-agnostic” and declarative ways of controlling data security.

Dealing with huge schemas

With several teams updating the message schema, we had to first address the schema compatibility issue. Developers need to come to a consensus on:
— Which version of the schema they should produce or consume and
— How to ensure message consumer applications, which didn’t update their code to accept the version being composed at producer, will not break.

Confluent Schema Registry has been the answer to compatibility issues between different versions of the raw input message by providing support for AVRO schema versioning. Adding a compatibility level as a mandatory parameter to registering a new schema has been a real game changer. Any team working with Kafka via Schema registry can now deploy new schema versions at their own pace.

Let’s see how we split the huge raw message with respect to a specific consumer downstream.

Cross-schema compatibility check: a new compatibility dimension and an underrated API call

DV has vastly expanded the inbound schema over the years. All development teams bought into schema registry management, but our chief architect kept searching for an optimal data solution: each team should declaratively require and get only the necessary data.

The general approach was clear: each dev team wanted to define a sub-schema (for the target subject), while the cloud platform engineers wanted heavy messages from the input Kafka topic to be read only once.

This resulted in a fan-out pipeline, as shown below. One input message is sent to N target topics in the form of N target subject schemas.

To make this pipeline work, we needed a tool to bring compatibility to a whole new level: not only did versions of the same schema need to be compatible, but also different schemas now have compatibility requirements. Namely, the SOURCE SUBJECT SCHEMA and TARGET SUBJECT SCHEMA had to be tested for compatibility.

Once again, Schema Registry API came to the rescue:

POST /compatibility/subjects/(string: subjectName)/versions/(versionId: version)
request JSON object: schema to be tested
response JSON object: is_compatible: boolean

This API method tested a new subject schema versus an existing one. We were able to allow teams to publish their own domain-oriented subjects with respect to the compatibility test call on the raw input schema. In general, we were almost ready to perform the huge schema split. But how could we do it in an efficient way?

A naive approach led us to a typical sequence of steps:

  1. Reading the source message and parsing it into a complex object in memory (deserialization).
  2. Creating an empty object for the target message and performing field population based on the source object and field mapping.
  3. Writing the target message from the target object in memory (serialization).

Such operations are both CPU and memory intensive, as they create lots of short-term highly complex objects in memory and perform lots of transformations and parsing operations.

With this, we faced the question: is it possible to avoid costly deserialization for each fan-out pipeline route?

Deserialize or not: how to stay “schema-agnostic”

Once we were set to guarantee cross-schema compatibility, we had to think of a technical implementation. We needed an effective transformer engine at the core of the fan-out pipeline. Here is a list of requirements we put together:

  • Compatibility is a given.
  • Input raw subject schema may change at any time.
  • Target subject schema may evolve at any time.
  • New consumers with new target schemas may occur eventually.
  • Transformation should be both CPU- and memory-efficient.

This list made us think of an abstract transformer that could transform any source message into a targeted message, provided the transformation itself was relevant and message schemas were compatible.

We stayed true to the AVRO format and picked up the low-level GenericDatumReader class from the open-source library. This provided great abstraction and was really instrumental in building the transformer core.

It allowed performing schema projections/transformations with minimal serialization footprint. The Scala code below shows how to build a binary decoder to transform raw byte array input messages into a set of target subject messages mapped into memory as GenericRecord instances:

  1. A common decoder is instantiated for every incoming raw AVRO message. We do this for each message because any of them might present a newer version of the reader schema
  2. For each target subject:
    a. A target decoder is built on top of the common decoder.
    b. Target schema is obtained in the latest version to make sure that a newer version, registered with Schema Registry, will be applied in the transformation.
    c. GenericDatumReader::read method is used with the decoder from 2.a to produce a GenericRecord instance of the target subject message.
//We have to BUILD unique decoder for EVERY specific MESSAGE, not schema.
val reusedDecoder: BinaryDecoder = DecoderFactory.get().binaryDecoder(bytes, startPosition, length, null)
subjectLists(sourceTopic).map { readerSubject =>
//Skeleton of the Decoder to be reconfigured to read the next buffer of the same format
//Decoder reads forward only ONCE - we have to treat it as iterator/generator
val decoder = DecoderFactory.get().binaryDecoder(bytes, startPosition, length, reusedDecoder)
//Also here - why update the schema on every stream element
val readerSchema = schemata.latestForSubject(readerSubject)
//Transformer - to consumer schema X
val reader = new GenericDatumReader[GenericRecord](writerSchema, readerSchema)
//Doing projecting from Writer schema to ReaderSchema X - using names, aliases, type compatibility
val record: GenericRecord = reader.read(null, decoder)

As you see, at no point in time does the transformer engine access any specific field of a message. No application or domain-specific logic is applied either. With such a core engine, we were able to turn the fan-out pipeline into an infrastructure unit.

This kind of approach finally led us to generalize huge input schema management into a declarative data governance self-service.

Putting it all together

To put the final tools together, we:

  1. Let the consumer request slices of the data. Teams declare their target subject schemas by submitting AVRO files.
  2. Built the CICD pipeline around the compatibility checks supported by the Schema Registry:
    a. Compatibility between versions of a schema
    b. Compatibility between different schemas
  3. Automated schema registration process.
  4. Provided tools for declarative fan-out pipeline graph design.
downstream-targets = [
{
source-topic: "impressions-us",
subject: "com.dv.avro.Report1",
sink-topic: "report-1"
},
{
source-topic: "impressions-us",
subject: "com.dv.avro.Report2",
sink-topic: "report-2"
}
]

5. Implemented fan-out pipeline application using Kafka streams.

When we implemented this solution at DV, we found ourselves able to read heavy input messages once. We learned how to use GitLab templates to provide solutions to various teams out of the box. Our consumers are now able to read smaller topics with necessary (and permitted) fields only.

--

--

DV Engineering
DoubleVerify Engineering

DoubleVerify engineers, data scientists and analysts write about their work and share their experience