Exploit Apache Kafka’s Message Format to Save Storage and Bandwidth

Seyed Morteza Mousavi
The Startup
Published in
9 min readOct 3, 2019
Traveling in a group can reduce costs. Image from Unsplash

One of the main problems we are encountering these days are the amount of disk space used by Apache Kafka topics. Our messages are serialized as JSON. As you know in JSON, each field of the data model is stored in the JSON string, which causes a lot of duplicated field names to be stored in the Kafka topic. To reduce disk space usage, we have several options and I want to share some of them here.

TL;DR

In this article, I will show you how we can reduce the disk space usage by Kafka. At first, I describe why reducing disk space is necessary. Then, I describe Kafka’s message format and how it helps us to save disk storage and bandwidth. After that, using benchmarks we show how lingering, compression and using Avro can help us to save disk space. And finally, In the end, you can view a table that can help you decide which approach is better for your use case.

Why do I need to reduce message size?

Although you may argue that storage is cheap today, there are several situations that you want to reduce disk space and bandwidth:

  • It is not strange to store all of your data inside the Kafka. New York Times, for example, uses Kafka as a source of truth! It is possible to keep the entire topic records forever by disabling retention in Kafka. With correct compression, you can reduce the amount of disk space usage significantly. If you don’t reduce message size, you might run out of disk space several times faster than you think!
  • If you can reduce the size of messages when you send them to the broker, you use smaller bandwidth and therefore you can increase total numbers of messages that can be sent to the brokers.
  • Cloud platforms offer pricing that is calculated by the amount of data that is written to or read from the Kafka cluster and the amount of data that is stored in the cluster! So reducing these factors can significantly reduce your application costs.

Before going to show you how we can reduce the message size, I want to describe Kafka’s message format.

Message format

The message format of Kafka is already described in its documentation, so I discuss it briefly.

For this article it is enough to define record, record batch and record batch overhead:

  • Record: each record is our familiar key and value pair (and some small additional data).
  • Record batch: each produce request (for a topic partition) that is sent to a Kafka broker is wrapped in a batch. Each batch contains one to several records and it contains a record batch overhead section.
  • Record batch overhead: each produced batch contains metadata about the records such as message version (magic byte), records count, compression algorithm, transaction and so on. This metadata is stored in the overhead section of the batch. Record batch overhead is 61 bytes.

Record batch overhead is constant and we cannot reduce the size of it. But we can optimize the size of the record batch in 3 ways: lingering, compression and using schemas for our records values and keys.

Lingering

How public transportation helps us to reduce traffic?

Image from https://www.reddit.com/r/pics/comments/clx25f/benefits_of_public_transport/

As you see above if each person uses his/her car to travel in the city, we might face heavy traffic. But on the other hand, using buses to transfer passengers to their destination can decrease traffic significantly. This will be achieved because we move the same people by using a smaller space (buses).

Lingering in Kafka helps us to do the same. Instead of sending a batch with a single record, we can wait a bit more and gather more records in produce time, and send them as a batch:

Use lingering to send multiple records in a single batch.

In the above image, the left producer sends each record immediately, and the right producer waits a bit longer to gather more records and send them as a batch to Kafka broker. This way, the right producer has only one record batch overhead and saved 122 bytes.

Let’s see this by running a benchmark. I produced 100 record batches to topic json-simple. Each record batch has exactly one record inside it. Now we can view the total size of these 100 record batches using kafka-log-dirs command:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-simpleQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-simple-0","size":18034,"offsetLag":0,"isFuture":false}]}]}]}

I bolded the size field in the result. As you can see these 100 batches use 18,034 bytes of our storage.

Now let's use a lingering and produce the same 100 records, but inside exactly one record batch. You can enable it by setting linger.ms config to the number of milliseconds that your producer must wait for gathering our 100 records. I produce this record batch inside json-lingering topic:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-lingeringQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-lingering-0","size":12031,"offsetLag":0,"isFuture":false}]}]}]}

As you see above, we need 12,031 to store the same data and saved 0.33 percent of disk space usage. Also, we need smaller network bandwidth, because the amount of byte that is sent to the broker is smaller.

Compression

Compression helps us to reduce the amount of data we want to store on the disk or send over the network at the cost of more CPU usage. This is a trade-off between more IO or more CPU usage. Most web applications today spend their time waiting for IO (network, database query and ..) and CPU usage might be small. Hence it makes sense to use CPU to reduce IO in this situation.

In Kafka producer, before sending the record batch to the broker, we can use compression to reduce the size of the batch. The producer compresses only the records inside the batch and does not compress the batch overhead section. When compression enabled, a flag bit is set for compression type in the batch overhead. This flag will be used during decompression in the consumer.

Compression algorithms work well when there is more duplication in the data. The more records you have in the batch, the higher the compression ratio you can expect. That is why the producer compresses all the records in the batch together (instead of compressing each record separately). For example, in JSON it is more likely to have the same field names in the adjacent records.

Kafka supports various compression algorithms. If you want to have a higher compression ratio you can use gzip in the cost of more CPU usage. But if you want less CPU usage and faster algorithm, you can choose Snappy. You can set the compression algorithm using compression.type config in the producer.

Let’s produce the same 100 records using lingering and gzip compression inside the topic json-gzip:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-gzipQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-gzip-0","size":3602,"offsetLag":0,"isFuture":false}]}]}]}

As you can see above, we need only 3,602 bytes to store our original data. We can choose Snappy algorithm for compression:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-snappyQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-snappy-0","size":5661,"offsetLag":0,"isFuture":false}]}]}]}

As expected using Snappy algorithm we need more bytes (5,661 bytes) to store our 100 records.

Schema

Using schema to store Kafka records, has several benefits and one of them is the reduction of record size. Because in JSON you have to store the name of each field with your data in the record, the total size of the record increases significantly. But if you choose to store your data in Avro format, you can store schema once and produce records based on that schema many times. This way you reduce the size of the record by removing the schema from the record. You just need to store the schema of the record value (or key) in Confluent Schema Registry and keep the schema id in the record.

Storing schema in a separate place makes sense because the schema is the same for most of the records and if you have a newer version of schema you can store the new one in Confluent Schema Registry.

Using lingering and Avro schema we produced the same 100 records to avro-lingering topic:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list avro-lingeringQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"avro-lingering-0","size":5559,"offsetLag":0,"isFuture":false}]}]}]}

Without compression, we need 5,559 bytes to store our 100 records. If I enable compression for the same batch we can reduce the size even further:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list avro-gzipQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"avro-gzip-0","size":3305,"offsetLag":0,"isFuture":false}]}]}]}

Using gzip we only need 3,305 bytes to store 100 records while for the same records (in the simplest case) we needed 18,034 bytes. Using this approach we can save 81 percent of our storage space. Notice that our sample records didn’t contain duplicated values (except a status field) and if you have duplication in your data (which is not rare), the size reduction would be very huge!

An interesting point about storing value by Avro format is that if you disable lingering, your record is typically smaller than a record stored with JSON. We can examine this by viewing the size of each record value using DumpLogSegments tool. Let’s first view the size of a record value with JSON format that we already published to json-lingering:

# kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --files /var/lib/kafka/data/json-lingering-0/00000000000000000000.logDumping /var/lib/kafka/data/json-lingering-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 99 count: 100 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1570112629952 size: 12031 magic: 2 compresscodec: NONE crc: 2574281250 isvalid: true
| offset: 0 CreateTime: 1570112629952 keysize: -1 valuesize: 107 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1570112629952 keysize: -1 valuesize: 107 sequence: -1 headerKeys: []
.
. 98 other records goes here!
.

In the above command, we can view the batch of 100 records using this DumpLogSegments. As you see, the batch contains 100 records and I omitted 98 records for brevity. The important point (as bolded) is that the size of each record value is 107 bytes. If we run the same command for avro-lingering:

# kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --files /var/lib/kafka/data/avro-lingering-0
/00000000000000000000.log
Dumping /var/lib/kafka/data/avro-lingering-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 99 count: 100 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1570088225966 size: 5559 magic: 2 compresscodec: NONE crc: 82210245 isvalid: true
| offset: 0 CreateTime: 1570088225966 keysize: -1 valuesize: 46 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1570088225966 keysize: -1 valuesize: 46 sequence: -1 headerKeys: []
.
. 98 other records goes here!
.

We can see using Avro we need only 46 bytes to store the value of each record. If the number of records inside the batch is small, using Avro format can reduce the size of the message significantly.

Comparison

I summarize the comparison using a table:

As you can see using gzip and Avro format has the best result.

Conclusion

We compared several approaches that can be used in Apache Kafka to reduce disk space usage. As you see each one has its trade-off:

  • Lingering needs you wait a bit more to gather more records.
  • Compression uses more CPU usage but will reduce the amount of IO.
  • Using Avro impose a dependency on clients (consumer and producer) to have another datastore (Confluent Schema Registery) to keep the schema of the records.

In some cases, these trade-offs should be acceptable and reduce resource usage.

If you want to run the benchmarks, the source code is available on GitHub.

References

https://kafka.apache.org/documentation/
https://github.com/apache/kafka
https://www.confluent.io/blog/decoupling-systems-with-apache-kafka-schema-registry-and-avro/
https://medium.com/@stephane.maarek/introduction-to-schemas-in-apache-kafka-with-the-confluent-schema-registry-3bf55e401321

--

--