Apache Kafka can be used to solve variety of problems. Most of the systems which use Kafka are distributed and involve real time data processing of large scale of messages. Think of a problem that you can solve using big data and now ask yourself a question “How will this solution be affected if the scale grows by the factor of 100000?” This question always bring me to the conclusion that the producer will keep on pumping the messages in and eventually the disk will run out of space to store messages.
Log Compaction is a strategy by which you can solve this problem in Apache Kafka. In this article, I will try to share my understanding on log compaction and its working, configuration and use cases.
Since this is going to be a deep dive into Kafka’s internals, I would expect you to have some understanding about Kafka. Although I’ve tried to keep the entry level for this article pretty low, you might not be able to understand everything if you’re not familiar with the general workings of it. Proceed further with that in mind.
Log Compaction-The Savior
The idea behind log compaction is selectively remove records where we have most recent update with the same primary key. Log Compaction strategy ensures that Kafka will always retain at least the last known value for each message key within the log for a single topic partition. It is mostly used for the scenarios such as restoring to the previous state before the application crashed or system failed, or while reloading cache after application restarts. In simple terms, Apache Kafka will keep latest version of a record and delete the older versions with same key.
Kafka log compaction allows consumers to regain their state from compacted topic. Here, it will never re-order the messages, but will delete few. Also, the partition offset for a message will never change. The log of data consists of a head and a tail. Every new message gets appended at the end of the head. All the log compaction works at the tail of the compacted log.
Life without log compact
Kafka offers multiple techniques to delete messages. By default, old log data is discarded after a fixed period of time or when the log reaches some predetermined size. These solutions might work for few of the use cases. In the scenarios, where switching back to the previous state of data is not necessary (which is for the most cases) and still the data is pushed in for the same key. This takes a lot of memory but the consumer is concerned about the latest state of data only. Then why do we need all this extra data?
Although Apache Kafka has log-cleaner threads which are used to clean the data or log messages based on its default strategy. Having pushed the real time data in for the same key, most of the applications are concerned with the latest state of data. All the extra data is retained in the memory by default for a week, which can be customized. Regardless, the memory is allocated for the extra data that can be used for new data. Hence the resources are wasted!
Life with Log compact Strategy
Consider a scenario, where we have a topic which contains the user email address. Each time a user updates their email address, we send a message to the topic using their user_id as a primary key. Now, as shown in figure, if user changes the email address over a period of time with user_id (999), each message will correspond with the change in email address.
A topic in Kafka are stored as logs and these logs are broken down into partitions. Furthermore, these partitions are divided into segment files. Segment file store records and each record are made up of key - value pair. Each topic log are divided into two series areas based on the offsets and insertion, these areas are called head and tail, every time a new record is inserted it gets appended at the end of the head and compaction happens at the tail.
Each message in Apache Kafka consist value, offset, timestamp, key, message size, compression codec, checksum, and version of the message format. Kafka creates this offset map to efficiently transfer the data with zero copy. As mentioned earlier, compaction strategy is implied on tail. Whenever we have a duplicate message record in the head, Kafka uses the newest offset. Now, the log-cleaner thread checks every record in the tail log and if there is another record with same key in the tail offset and its offset is different from the current map then it is removed.(Should not be the latest record)
The compaction is done in the background periodically. Cleaning does not block consumer reads and amount of I/O throughput to avoid impacting producers and consumers.
This strategy not only deletes the duplicate records but also removes keys with null value. These records are also known as Tombstone records. Now the log is clean and we have new tail and head! Please refer image below.
Commands and Setup
Let’s create a compacted topic and discuss about its properties.
`kafka-topics –-zookeeper 127.0.0.1:2181 — create –topic sample-test-topic –partitions 3 –replication-factor 1 — config “cleanup.policy=compact” — config “delete.retention.ms=100” — config “segment.ms=100” — config “min.cleanable.dirty.ratio=0.01”`
1) To activate compaction cleanup policy “cleanup.policy=compact” should be placed
2) The consumer sees all tombstones as long as the consumer reaches head of a log in a period less than the topic config `delete.retention.ms` (the default is 24 hours).
3) The number of these threads are configurable through log.cleaner.threads config
4) The cleaner thread then chooses the log with the highest dirty ratio.
dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)
5) Topic config min.compaction.lag.ms gets used to guarantee a minimum period that must pass before a message can be compacted.
6) To set delay to start compacting records after they are written use topic config log.cleaner.min.compaction.lag.ms. Records won’t get compacted until after this period. The setting gives consumers time to get every record.
Compacted topics require memory and CPU resources on your brokers. Log compaction needs both heap (memory) and CPU cycles on the brokers to complete successfully as well as failed log compaction puts brokers at risk from a partition that grows unbounded.
· You can tune log.cleaner.dedupe.buffer.size and log.cleaner.threads on your brokers, but keep in mind that these values affect heap usage on the brokers.
· If a broker throws an OutOfMemoryError exception, it will shut down and potentially lose data.
· The buffer size and thread count will depend on both the number of topic partitions to be cleaned and the data rate and key size of the messages in those partitions
In this article, I showcased my understanding on log compaction strategy in Apache Kafka. Here I want to point out that log compaction is good option for caching scenarios from which you can just read your latest state of compacted topic.
I will publish a thread of blogs for a well-defined use case showcasing the power and capability of log compaction. You can reach out to me at firstname.lastname@example.org.
Thanks & Happy Learning. :)