Confluent Platform offers an open-source Schema Registry to easily integrate Apache Avro and Apache Kafka as well as an ecosystem of tooling aiming at simplifying Apache Avro schema management.
Although most of Apache Kafka users apply Apache Avro to define a strict but easy to evolve contract for their messages, only JVM developers can leverage classes automatically generated by the Apache Avro compiler.
What if your streaming platform is serving JVM and non-JVM users? How can we empower engineers using non-JVM languages such as Python, Ruby or Go to leverage auto-generated code?
In this case, the best solution is Google Protocol Buffer. Best known as a GRPC enabler, Protobuf can be used for serialising, deserialising and validating data. proto-c, the Protobuf compiler, compiles “.proto” files into native code for most of the mainstream programming languages.
The only disadvantage of using Protobuf as Kafka encoder is that you need to develop your custom Schema Registry or wait until Confluent supports Protobuf (take a look at Schema Registry v6.0.0-beta and more specifically this commit e21d751, that day is not so far 👍).
What is Google Protobuf?
Protocol buffers are a flexible, efficient, automated mechanism for serializing structured data.
The data representation, or schema, is defined in a “.proto” file: a small logical record of information, containing a series of name-value pairs. proto-c can be used for automatically compiling “.proto” files into generated source code to easily write and read Protobuf messages using a variety of languages.
For a streaming platform, supporting schema evolution is a fundamental requirement. Protobuf offers mechanisms for updating schema without breaking downstream consumers which are still using a previous schema version.
The indeterministic Protobuf serialisation issue
From the official Google Protobuf documentation
By default, repeated invocations of serialization methods on the same protocol buffer message instance may not return the same byte output; i.e. the default serialization is not deterministic.
From the release note of Protobuf 3.0.0
Added a deterministic serialization API (currently available in C++). The deterministic serialization guarantees that given a binary, equal messages will be serialized to the same bytes. This allows applications like MapReduce to group equal messages based on the serialized bytes. The deterministic serialization is, however, NOT canonical across languages; it is also unstable across different builds with schema changes due to unknown fields. Users who need canonical serialization, e.g. persistent storage in a canonical form, fingerprinting, etc, should define their own canonicalization specification and implement the serializer using reflection APIs rather than relying on this API.
From the opencontainers.com Google group
In any case, the upshot of this is that while a particular implementation of the proto library may deterministically produce the same serialized proto every time when given a particular proto message, there’s no guarantee that two different proto libraries will serialize it in the same way, nor are there any guarantees that any particular proto library serializer will be stable over time.
As a consequence, the following checks may fail for a protocol buffer message instance
foo.SerializeAsString() == foo.SerializeAsString()Hash(foo.SerializeAsString()) == Hash(foo.SerializeAsString())CRC(foo.SerializeAsString()) == CRC(foo.SerializeAsString())FingerPrint(foo.SerializeAsString()) == FingerPrint(foo.SerializeAsString())
The problem is not only cross-language, but it may affect a single library too. There is no guarantee that the same library will consistently implement a deterministic encoding over time.
The indeterminism is caused by
- the absence of guarantees on how known or unknown fields should be ordered upon writing them
- map serialisation relies on iteration, and there is no ordering guarantee while iterating over map items
How does Protobuf indeterminism impact Kafka usage?
One of the key features of Apache Kafka is log compaction. Quoting the official documentation:
Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
Each message in Apache Kafka consists of a key, a value and some metadata. Log compaction
- scans each topic segment looking for duplicated keys
- collects every old record with a duplicated key
- eventually deletes them
- generates a new topic statement containing only the latest record for each key.
In Kafka, everything is a bunch of bytes. Duplicated keys are identified by comparing their byte representation: two keys are identical if and only if their bytes match.
An essential requirement for enabling log compaction is applying a deterministic encoder for Kafka keys. Serialising multiple times the same key must always generate the same output.
Hence, Kafka keys can be
- encrypted only with algorithms which do not add any randomity
- serialised only with encoders providing deterministic output
Given the indeterministic issue, Google Protobuf should not be used for serialising Kafka keys.
Flexible serialisation setup
Kafka client APIs are flexible enough to let both producer and consumer specify different types of encoding for the key and value.
On the producer side
key.serializer: Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
value.serializer: Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
On the consumer side
key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.
value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.
Several Kafka libraries offer constructors for specifying serialisers or deserialisers upon instantiating clients. For instance, from the official Java library
Therefore, Google Protobuf can be used for encoding the value while a better solution can be applied for encoding the record key.
Google Protobuf is an excellent candidate for serialising Kafka value:
- provide a very fast serialisation algorithm
- allow disk space optimisation thanks to its compress encoder
- offer an easy way to evolve schema
- offer cross-language support for code generation
- Confluent Schema Registry will soon support it
But be aware of
- indeterministic serialisation issue which may impact log compaction
- because most of the open-source tools for Apache Kafka do not support Protobuf, you may need to reimplement a lot of things