You’ve got a friend LogCleaner ;)

Yannick Lambruschi
20 min readJul 3, 2019

Hi

Don’t hesitate to correct/add/remove/whatever things

All started with that

To be more precise, since the 9th of May (2019) disk capacity started to go down without specific reason on our Kafka cluster brokers.

After few tricks with du (yeee :p), we found the guilty topic, which is :
__consumer_offsets (especially the partition 20 segments)

For recall, this is the topic in which Kafka consumer will send their committed offsets ( in case of a crash or rebalance, they can resume to the last offset they did commit).

For more info about this consumer offset tracking, check this :
https://kafka.apache.org/documentation/#impl_offsettracking

If you read the doc above, you will now know that this topic has no delete retention policy but it has a compaction cleanup policy, which means that it will try to only keep the last values (for each key) in the logs (which does make sense for this kind of topic).

Ask zookeeper and see it by yourself:

biz@ubuntu:~$ /bin/kafka-topics.sh — zookeeper localhost:2181 — describe — topic __consumer_offsetsTopic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producerTopic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 2 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 3 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 4 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 5 Leader: 1 Replicas: 1 Isr: 1

So.. This is where LogCleaner came into our life :p

The LogCleaner is a Kafka thread responsible for doing this compaction:

http://cloudurable.com/blog/kafka-architecture-log-compaction/index.html

As the __consumer_offsets topic was growing in size, we decided to check the state of this log cleaner. Normally, it has a dedicated log file called log-cleaner.log

/var/log/kafka/log-cleaner.log[2019–05–09 19:01:19,985] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-20. (kafka.log.LogCleaner)[2019–05–09 19:01:19,985] INFO Cleaner 0: Building offset map for __consumer_offsets-20… (kafka.log.LogCleaner)[2019–05–09 19:01:20,004] INFO Cleaner 0: Building offset map for log __consumer_offsets-20 for 4 segments in offset range [4822038048, 4826329948). (kafka.log.LogCleaner)[2019–05–09 19:01:25,369] INFO Cleaner 0: Offset map for log __consumer_offsets-20 complete. (kafka.log.LogCleaner)[2019–05–09 19:01:25,369] INFO Cleaner 0: Cleaning log __consumer_offsets-20 (cleaning prior to Thu May 09 19:01:07 UTC 2019, discarding tombstones prior to Wed May 08 17:48:49 UTC 2019)… (kafka.log.LogCleaner)[2019–05–09 19:01:25,369] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-20 (largest timestamp Sat Mar 02 21:24:12 UTC 2019) into 0, discarding deletes. (kafka.log.LogCleaner)[2019–05–09 19:01:25,370] INFO Cleaner 0: Swapping in cleaned segment 0 for segment(s) 0 in log __consumer_offsets-20 (kafka.log.LogCleaner)[2019–05–09 19:01:25,370] INFO Cleaner 0: Cleaning segment 2146719977 in log __consumer_offsets-20 (largest timestamp Thu Apr 18 08:04:34 UTC 2019) into 2146719977, discarding deletes. (kafka.log.LogCleaner)[2019–05–09 19:01:25,389] INFO Cleaner 0: Swapping in cleaned segment 2146719977 for segment(s) 2146719977 in log __consumer_offsets-20 (kafka.log.LogCleaner)[2019–05–09 19:01:25,389] INFO Cleaner 0: Cleaning segment 4293321003 in log __consumer_offsets-20 (largest timestamp Wed May 08 17:09:24 UTC 2019) into 4293321003, discarding deletes. (kafka.log.LogCleaner)[2019–05–09 19:01:25,406] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner)[2019–05–09 19:01:25,426] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 1000012 bytes. (kafka.log.LogCleaner)[2019–05–09 19:01:25,504] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 1000012.
at kafka.log.Cleaner.growBuffers(LogCleaner.scala:675)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:627)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019–05–09 19:01:25,505] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)

He died the 9th of May ( which is my birthday btw..) quite exactly when the segments folder began to grow with this only clue :

This log contains a message larger than maximum allowable size of 1000012.

You have few articles that deal with this issue, for example this one is a good start :
https://medium.com/@anishekagarwal/kafka-log-cleaner-issues-80a05e253b8a

Unfortunately, we were not able to find a solution/clear explanation of what was happening. I mean.. We had lot of clues, but no “100% sure” of what the problem was and what was the good way to deal with it.

In the meantime production Kafka disks kept losing capacity and we did not have so much time to think, few days max..

After reading Kafka code and asking google, we decided to play with the value max.message.bytes in order to help LogCleaner ( as a workaround). Why ?

If you look at the LogCleaner implementation, you’ll get this (I’ll go into more detail later) :

// if we read bytes but didn’t get even one complete message, our I/O buffer is too small, grow it and try again
if (readBuffer.limit() > 0 && result.messagesRead == 0)
growBuffers(maxLogMessageSize)

The maxLogMessageSize is set using max.message.bytes

So we increased it to 50MB in /etc/kafka/conf/server.properties and the infra guys did restart the cluster, hoping that it would solve LogCleaner issue and let him catch up with the compaction, thus freeing disks space.

It indeed helped to do some compaction and gain some space, but again, LogCleaner crashed after some time:

[2019–05–16 15:02:31,570] INFO Cleaner 0: Cleaning segment 4293321003 in log __consumer_offsets-20 (largest timestamp Wed May 08 17:09:24 UTC 2019) into 4293321003, discarding deletes. (kafka.log.LogCleaner)[2019–05–16 15:02:31,584] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner)[2019–05–16 15:02:31,594] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 1048576 bytes. (kafka.log.LogCleaner)[2019–05–16 15:02:31,603] INFO Cleaner 0: Growing cleaner I/O buffers from 1048576bytes to 2097152 bytes. (kafka.log.LogCleaner)[2019–05–16 15:02:31,712] INFO Cleaner 0: Growing cleaner I/O buffers from 2097152bytes to 4194304 bytes. (kafka.log.LogCleaner)[2019–05–16 15:02:31,733] INFO Cleaner 0: Growing cleaner I/O buffers from 4194304bytes to 5000000 bytes. (kafka.log.LogCleaner)[2019–05–16 15:02:31,772] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 5000000.
at kafka.log.Cleaner.growBuffers(LogCleaner.scala:675)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:627)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019–05–16 15:02:31,772] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)

And yeah.. being original.. we tried again with higher value (100MB). But again, something was wrong, and it crashed again after a moment:

[2019–05–18 11:52:20,317] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-20. (kafka.log.LogCleaner)[2019–05–18 11:52:20,317] INFO Cleaner 0: Building offset map for __consumer_offsets-20… (kafka.log.LogCleaner)[2019–05–18 11:52:20,335] INFO Cleaner 0: Building offset map for log __consumer_offsets-20 for 22 segments in offset range [5242799803, 5266386301). (kafka.log.LogCleaner)[2019–05–18 11:52:48,078] INFO Cleaner 0: Offset map for log __consumer_offsets-20 complete. (kafka.log.LogCleaner)[2019–05–18 11:52:48,078] INFO Cleaner 0: Cleaning log __consumer_offsets-20 (cleaning prior to Sat May 18 11:52:15 UTC 2019, discarding tombstones prior to Thu May 16 11:43:49 UTC 2019)… (kafka.log.LogCleaner)[2019–05–18 11:52:48,105] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-20 (largest timestamp Sat Mar 02 21:24:12 UTC 2019) into 0, discarding deletes. (kafka.log.LogCleaner)[2019–05–18 11:52:48,106] INFO Cleaner 0: Swapping in cleaned segment 0 for segment(s) 0 in log __consumer_offsets-20 (kafka.log.LogCleaner)[2019–05–18 11:52:48,106] INFO Cleaner 0: Cleaning segment 2146719977 in log __consumer_offsets-20 (largest timestamp Mon Apr 29 04:36:02 UTC 2019) into 2146719977, discarding deletes. (kafka.log.LogCleaner)[2019–05–18 11:52:48,107] INFO Cleaner 0: Swapping in cleaned segment 2146719977 for segment(s) 2146719977 in log __consumer_offsets-20 (kafka.log.LogCleaner)[2019–05–18 11:52:48,107] INFO Cleaner 0: Cleaning segment 4293321003 in log __consumer_offsets-20 (largest timestamp Wed May 08 13:42:18 UTC 2019) into 4293321003, discarding deletes. (kafka.log.LogCleaner)[2019–05–18 11:52:48,124] INFO Cleaner 0: Cleaning segment 4753627488 in log __consumer_offsets-20 (largest timestamp Thu May 09 01:46:21 UTC 2019) into 4293321003, discarding deletes. (kafka.log.LogCleaner)[2019–05–18 11:52:48,923] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 10000000.at kafka.log.Cleaner.growBuffers(LogCleaner.scala:675)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:627)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019–05–18 11:52:48,924] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)

If you’re still reading at this point, you’ll notice here that we don’t have the “Growing Cleaner I/O buffer” messages. It’s because we also tried (in a desperate mood), at the same time to increase the log.cleaner.io.buffer.size which is nothing else than the readBuffer initial capacity, so instead of starting from the default value “524288”, it started also from 100MB and skip the growing steps.

But still, it crashed :p

Finally, we got to this moment when people should stop believe that magic exists and try to understand the problem before trying random desperate things.

The fact that it was always the __consumer_offsets-20 segments that made LogCleaner crash was a first clue.

Another clue was that, on the 8th of May (just one day before, hmm..), we ran a new service that mainly use Kafka for deduplication, which we will call dedup (yeah I’m lazy)

One thing to know, is that Kafka will always assign the __consumer_offsets partitions based only on the consumer group. It means that a same consumer group will always have its offsets committed to the same __consumer_offsets partition. It’s useful for troubleshooting

Let’s check what’s inside those __consumer_offsets segments (maybe we’ll have some clues about which consumer group is feeding them), we can use the well known DumpLogSegments tool, as follows:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments — deep-iteration — print-data-log — offsets-decoder — files /tmp/kafka-logs/__consumer_offsets-20/00000000000011368477.log

Don’t forget the — offset-decoder because those segments are Kafka internal stuff thus meant to be specifically decoded.

We got lot of interesting stuff ( for which I’ll go in details later), but this little one is enough:

offset: 11428576 position: 5552165 CreateTime: 1559751318007 isvalid: true keysize: 25 valuesize: 24 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {“metadata”:”dedup_consumer_group_”} payload: {“protocolType”:”consumer”,”protocol”:null,”generationId”:22,”assignment”:”{}”}

The only thing that interests us, here, is that the only consumer group we can see in those segments is the dedup consumer group. He sounds guilty ^^

(btw, this message helps the consumer coordinator understand that the consumer group dedup_consumer_group_ had no more topics/partitions assigned to it. ⇒ “assignment”:”{}”. This typically happens when we shut the service. But we don’t really care about this here. This is just for info).
[Correct me if I’m wrong please]

So dedup seems responsible, but how ?

Some information on dedup you might need to know:

  • It started one day before the tragedy
  • It uses Kafka transactions, kind of a “first time” for our team. In one transaction, it reads from Kafka, do some cache checks to avoid duplicates and write to a new deduplicated topic in Kafka.

What came to our mind is, maybe, using Kafka transactions ( in a quite massive way) might have been not cool for LogCleaner.

Ok.. so what’s under the Kafka hood..

Let’s have a quick overview of the log dumps to understand with what LogCleaner is playing with.

First of all, a standard topic segment dump (using DumpLogSegments), without transaction will look like this. This is a standard record dump , just one record in one batch:

baseOffset: 0 lastOffset: 2061 count: 2062 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1559775668043 size: 915916 magic: 2 compresscodec: NONE crc: 2052409758 isvalid: true| offset: 0 CreateTime: 1559775665045 keysize: 5 valuesize: 428 sequence: -1 headerKeys: [] key: myKey payload:DATAPAYLOADIDONTWANTTOSHOW

You can see the offsets, the key, the payload, some flags especially one that says that it’s not a transactional record, so Kafka won’t care about marking this record COMMITTED or ABORTED.

Btw, some doc about Kafka transaction might be useful for what will follow:
https://www.confluent.io/blog/transactions-apache-kafka/

You’ll notice a “two lines” decode format with our dump above ( and in the following examples). Starting with the 2.2.0 version of kafka-run-class, this last one print the baseOffset (can be seen as the starting batch offset) and the different offsets of each record.

Now, here is an example of a dump of the internal __consumer_offsets topic when a consumer commit its offsets, here for the consumer group fooGroupId, the topic fooTopic, and the partition 0.. Which make the following key “ fooGroupId:fooTopic:0

baseOffset: 265558841639 lastOffset: 265558841717 count: 79 baseSequence: 0 lastSequence: 78 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 57 isTransactional: false isControl: false position: 0 CreateTime: 1558021498725 size: 6718 magic: 2 compresscodec: NONE crc: 3261847630 isvalid: true| offset: 265558841639 CreateTime: 1558021498725 keysize: 45 valuesize: 28 sequence: 0 headerKeys: [] key: offset::fooGroupId:fooTopic:0 payload: 211

As you can see, the payload is the offset committed itself (which is offset 211)by the consumer.
So if the consumer crash, or if there is a rebalance or whatever, the next consumer in the consumer group fooGroupId that will be assigned partition 0 of topic fooTopic will know that it has to start from offset 211. But you might already know that.

Let’s go back to our LogCleaner problem.. We thought that it was maybe because of Kafka transactions.. So now, we’ll speak about two kind of “commit”, first the one we already talked about, that is to say, the offset commits, which has nothing to do with Kafka transaction, and then we’ll start talking about COMMIT markers that appears in the logs, which have everything to do with Kafka transactions. Don’t mix them up.

Here is a dump of a standard record ( not __consumer_offsets..) but this time with transaction enabled while producing:

baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 22 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1559763630357 size: 493 magic: 2 compresscodec: NONE crc: 273656367 isvalid: true| offset: 0 CreateTime: 1559763630357 keysize: -1 valuesize: 423 sequence: 0 headerKeys: [] payload:DATAPAYLOADISTILLDONTWANTTOSHOWbaseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 22 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 493 CreateTime: 1559763122022 size: 78 magic: 2 compresscodec: NONE crc: 3996527633 isvalid: true| offset: 1 CreateTime: 1559763122022 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

There are two records. The first one, is the one we know, which is the standard record with the payload. But this time ( I hope you read the documentation about Kafka transactions) you have the flag isTransactional set to true. So Kafka knows that this record is transactional and will then wait a COMMIT control message to mark it as COMMITTED. That way, the consumers that will read “isolated” will be able to see this message only when this marker will be set.

You also see the famous producerId and producerEpoch ( that help identify transaction ownership and help fencing zombie producers)

Then you have the second record, which is the transaction COMMIT message. As you can see, this is a control message (flag isControl set to true, which is important for the following) and has no payload but only a COMMIT marker. This record is also part of the transaction, thus the transactional flag is set too.

This finally brings us to the point where LogCleaner has some trouble. The offsets commits are also part of a Kafka transaction. It means that, you’ll also have transaction control messages ( typically COMMIT/ABORTED markers) in the __consumer_offsets topic.

Here is a example of a consumer that reads from topicTOTO topic in a transaction and commits its offsets in this same transaction ( we’ll only see the __consumer_offsets part here):

baseOffset: 13 lastOffset: 15 count: 3 baseSequence: 0 lastSequence: 2 producerId: 1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 1724 CreateTime: 1558430251613 size: 289 magic: 2 compresscodec: NONE crc: 3170614090 isvalid: true| offset: 13 CreateTime: 1558430251613 keysize: 40 valuesize: 28 sequence: 0 headerKeys: [] key: offset::dedup_consumer_group_:topicTOTO:2 payload: 21| offset: 14 CreateTime: 1558430251613 keysize: 40 valuesize: 28 sequence: 1 headerKeys: [] key: offset::dedup_consumer_group_:topicTOTO:1 payload: 21| offset: 15 CreateTime: 1558430251613 keysize: 40 valuesize: 28 sequence: 2 headerKeys: [] key: offset::dedup_consumer_group_:topicTOTO:0 payload: 29baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 producerId: 1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 2013 CreateTime: 1558430251672 size: 78 magic: 2 compresscodec: NONE crc: 2212401563 isvalid: true| offset: 16 CreateTime: 1558430251672 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

As you can see, this works quite the same way as for the standard topic.

You’ll notice that you have 3 records (offsets 13, 14 and 15) in one batch (one for each partition, but you could have multiple ones for same partition as well), this is just the batching behavior, it has nothing to do with transactions, it’s just that in the previous examples, I put only one record for clarity..

The consumer coordinator will listen to this topic and will only put the committed offsets in cache ( to then answer to the offset fetch requests from consumers with those committed offsets) when it will receive the COMMIT marker control message for this transaction batch.

As dedup does a lot of transaction for all the topics it deduplicates, we can imagine that it sends lots of COMMIT control messages to the __consumer_offsets ( partition 20).

In fact, when we tried to look at the faulty segment file that made LogCleaner crashed, we had something like this:

baseOffset: 4310760245 lastOffset: 4310760245 count: 1 baseSequence: -1 lastSequence: -1 producerId: 66007 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 0 CreateTime: 1556544968606 size: 78 magic: 2 compresscodec: NONE crc: 2072858171 isvalid: true| offset: 4310760245 CreateTime: 1556544968606 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0baseOffset: 4310760295 lastOffset: 4310760295 count: 1 baseSequence: -1 lastSequence: -1 producerId: 65010 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 78 CreateTime: 1556544968767 size: 78 magic: 2 compresscodec: NONE crc: 2830498104 isvalid: true| offset: 4310760295 CreateTime: 1556544968767 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0baseOffset: 4310760731 lastOffset: 4310760731 count: 1 baseSequence: -1 lastSequence: -1 producerId: 64005 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 156 CreateTime: 1556544969525 size: 78 magic: 2 compresscodec: NONE crc: 3044687360 isvalid: true| offset: 4310760731 CreateTime: 1556544969525 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0baseOffset: 4310760732 lastOffset: 4310760732 count: 1 baseSequence: -1 lastSequence: -1 producerId: 66009 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 234 CreateTime: 1556544969539 size: 78 magic: 2 compresscodec: NONE crc: 1011583163 isvalid: true| offset: 4310760732 CreateTime: 1556544969539 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0baseOffset: 4310760997 lastOffset: 4310760997 count: 1 baseSequence: -1 lastSequence: -1 producerId: 64003 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 312 CreateTime: 1556544969889 size: 78 magic: 2 compresscodec: NONE crc: 3038588128 isvalid: true| offset: 4310760997 CreateTime: 1556544969889 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0baseOffset: 4310761010 lastOffset: 4310761010 count: 1 baseSequence: -1 lastSequence: -1 producerId: 66007 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 390 CreateTime: 1556544969920 size: 78 magic: 2 compresscodec: NONE crc: 2858783667 isvalid: true| offset: 4310761010 CreateTime: 1556544969920 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0baseOffset: 4310761109 lastOffset: 4310761109 count: 1 baseSequence: -1 lastSequence: -1 producerId: 65009 producerEpoch: 2 partitionLeaderEpoch: 50 isTransactional: true isControl: true position: 468 CreateTime: 1556544970302 size: 78 magic: 2 compresscodec: NONE crc: 1928456303 isvalid: true| offset: 4310761109 CreateTime: 1556544970302 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

As you can see, we don’t have any more “payload messages” but just control messages. It’s like LogCleaner compact the segment and get rid of the “committed offsets” payloads but the transactional control messages remain..

At this point we decided to have a look at the LogCleaner code to understand what it’s doing with this control messages..

I almost forgot to say that, we spent a lot of time trying to reproduce this issue on QA, on local machines with different version of Kafka, but without success. We generated the same amount of traffic, same kind of Data, but nothing to do, we were not able to reproduce this faulty LogCleaner behavior except in production after “some time”.

Btw we use this Kafka version in production:

[2019–06–05 16:04:13,023] INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser)[2019–06–05 16:04:13,023] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)

If we dive into the LogCleaner classes, it looks like this :

I’m sorry in advance for the lack of readability of the following pieces of code ( I pasted permalink to github piece of code for each example if you need clarity)

The main entry point to understand our problem is here, in the LogCleaner class (1.1 Kafka version), in the cleanInto() method:

// if we read bytes but didn’t get even one complete message, our I/O buffer is too small, grow it and try again
if (readBuffer.limit() > 0 && result.messagesRead == 0)
growBuffers(maxLogMessageSize)
}
restoreBuffers()
}
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L625-L631

As we can see, if the nio readBuffer has something to read and if the result.messagesRead is empty, it means we should double our buffer ( to a maximum of max.messages.bytes, as seen earlier).

Btw, here is our beloved error message btw.. In the growBuffers method :

/**
* Double the I/O buffer capacity
*/
def growBuffers(maxLogMessageSize: Int) {
val maxBufferSize = math.max(maxLogMessageSize, maxIoBufferSize)
if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize)
throw new IllegalStateException(“This log contains a message larger than maximum allowable size of %s.”.format(maxBufferSize))

val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize)
info(“Growing cleaner I/O buffers from “ + readBuffer.capacity + “bytes to “ + newSize + “ bytes.”)
this.readBuffer = ByteBuffer.allocate(newSize)
this.writeBuffer = ByteBuffer.allocate(newSize)
}
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L670-L681

If we go back to our cleanInto method, result.messagesRead looks suspicious .. And in our cleanInto() method, result is populated like this:


val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier)
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L604

Where records is a MemoryRecords

So if we look at the filterTo method of the MemoryRecords class, this is a method that is responsible for filtering the different records we have in a batch, for example “ skipping the control messages ^^”


public FilterResult filterTo(TopicPartition partition, RecordFilter filter, ByteBuffer destinationBuffer,
int maxRecordBatchSize, BufferSupplier decompressionBufferSupplier) {
return filterTo(partition, batches(), filter, destinationBuffer, maxRecordBatchSize, decompressionBufferSupplier);
}
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long maxOffset = -1L;
long shallowOffsetOfMaxTimestamp = -1L;
int messagesRead = 0;
int bytesRead = 0;
int messagesRetained = 0;
int bytesRetained = 0;
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);for (MutableRecordBatch batch : batches) {
bytesRead += batch.sizeInBytes();
BatchRetention batchRetention = filter.checkBatchRetention(batch);
if (batchRetention == BatchRetention.DELETE)
continue;
...byte batchMagic = batch.magic();
boolean writeOriginalBatch = true;
List<Record> retainedRecords = new ArrayList<>();
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
while (iterator.hasNext()) {
Record record = iterator.next();
messagesRead += 1;
return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L120-L173

We can see something super interesting, the messagesRead is not incremented if the checkBatchRetention() method returns BatchRetention.DELETE

The checkBatchRetention() is implemented in the LogCleaner class :

override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
// we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
// note that we will never delete a marker until all the records from that transaction are removed.
discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletes)
// check if the batch contains the last sequence number for the producer. if so, we cannot
// remove the batch just yet or the producer may see an out of sequence error.
if (batch.hasProducerId && activeProducers.get(batch.producerId).contains(batch.lastSequence))
BatchRetention.RETAIN_EMPTY
else if (discardBatchRecords)
BatchRetention.DELETE
else
BatchRetention.DELETE_EMPTY
}
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L570-L583

So.. Here there is one interesting thing:

Transaction markers should be deleted according to the same policy applied for tombstones. For recall, tombstones are null values for a specific key, letting the LogCleaner know that it can delete this record. But there is a retention period before deleting those records (mainly to be sure consumers will receive it, indeed tombstones can also be useful for consumers).
This delete retention time is defined by delete.retention.ms and is by default of 1 day.

checkBatchRetention() should return BatchRetention.DELETE if shouldDiscardBatch returns true..

When we look at the shouldDiscardBatch method :

private def shouldDiscardBatch(batch: RecordBatch,
transactionMetadata: CleanedTransactionMetadata,
retainTxnMarkers: Boolean): Boolean = {
if (batch.isControlBatch) {
val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch)
canDiscardControlBatch && !retainTxnMarkers
} else {
val canDiscardBatch = transactionMetadata.onBatchRead(batch)
canDiscardBatch
}
}
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L633-L643

We know our log segment is full of control message batches, for which the isControlBatch will return true.

And it’s like, if retainTxnMarkers is set to false (means LogCleaner will have to discard markers) then this method will always return true and we know that if it returns true, we will skip all the control markers without incrementing messagesRead that will lead LogCleaner to try doubling its buffer.

This last clue helped us understand why we were not able to reproduce the problem, we needed this “retainTxnMarkers” to be set to false to make LogCleaner crash.

If we look at how this method is called by LogCleaner :

discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletes)
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L573

retainTxnMarkers is just an alias of retainDeletes (piggy-backed policy)

Then we tried to understand what could be the meaning of the retainDeletes attribute in LogCleaner

val retainDeletes = currentSegment.lastModified > deleteHorizonMshttps://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L515

If we want this retainsDeletes to be set to false, we need the last modification time of the segment to be older than deleteHorizonMs.

val deleteHorizonMs = cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
}

doClean(cleanable, deleteHorizonMs)
https://github.com/apache/kafka/blob/34fad3d94dc6349634b8cb2d7c61c9d821e339fe/core/src/main/scala/kafka/log/LogCleaner.scala#L432-L438

According to those last lines, we know that we did not succeed to make LogCleaner crash because we had to wait for a certain amount of time that a compacted log contains mainly trx control markers ( enough consecutive ones to maximise the LogCleaner buffer capacity).

This amount of time is defined by the DELETE_RETENTION_MS_CONFIG value. As we can see in the TopicConfig class :

val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG

So to be sure to understand our issue and to point out that it’s a Kafka bug, we reduce quite a lot this delete retention period, which is 24 hours by default (the delete retention period we talked about for tombstones) :

Which is coherent with the fact that dedup started the 8th of May, and LogCleaner crashed during the 9th of May, by the way.

We set it as follows :

/etc/kafka/conf/server.properties

log.cleaner.delete.retention.ms=1000000

And we rerun our tests with same kind of traffic as in production.

After some LogCleaner compactions, we had a segment file of 29MB containing almost only COMMIT control messages.

And then here what we got:

/var/log/kafka/log-cleaner.log[2019–06–05 16:10:00,425] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-20. (kafka.log.LogCleaner)[2019–06–05 16:10:00,425] INFO Cleaner 0: Building offset map for __consumer_offsets-20… (kafka.log.LogCleaner)[2019–06–05 16:10:00,445] INFO Cleaner 0: Building offset map for log __consumer_offsets-20 for 1 segments in offset range [9098518, 10233952). (kafka.log.LogCleaner)[2019–06–05 16:10:01,062] INFO Cleaner 0: Offset map for log __consumer_offsets-20 complete. (kafka.log.LogCleaner)[2019–06–05 16:10:01,062] INFO Cleaner 0: Cleaning log __consumer_offsets-20 (cleaning prior to Wed Jun 05 16:09:46 UTC 2019, discarding tombstones prior to Wed Jun 05 15:58:36 UTC 2019)… (kafka.log.LogCleaner)[2019–06–05 16:10:01,063] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-20 (largest timestamp Wed Jun 05 15:54:50 UTC 2019) into 0, discarding deletes. (kafka.log.LogCleaner)[2019–06–05 16:10:01,068] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner)[2019–06–05 16:10:01,073] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 1000012 bytes. (kafka.log.LogCleaner)[2019–06–05 16:10:01,085] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 1000012.
at kafka.log.Cleaner.growBuffers(LogCleaner.scala:675)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:627)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019–06–05 16:10:01,088] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)

Yeeeee : D

To bring more clues, if we look at all the logs we had in production, the LogCleaner crashes only happen when Logcleaner try to discard deletes, with this kind of message before crashing :

[2019–05–18 11:52:48,124] INFO Cleaner 0: Cleaning segment 4753627488 in log __consumer_offsets-20 (largest timestamp Thu May 09 01:46:21 UTC 2019) into 4293321003, discarding deletes. (kafka.log.LogCleaner)

This happens when the retainsDelete variable is set to false.

We never had the LogCleaner crashing when this variable was set to true:, with this kind of message while compacting :

[2019–06–05 16:02:01,954] INFO Cleaner 0: Cleaning segment 0 in log __transaction_state-32 (largest timestamp Wed Jun 05 16:01:46 UTC 2019) into 0, retaining deletes. (kafka.log.LogCleaner)

Solution:

Discussion about this kind of problem:

https://www.mail-archive.com/search?l=jira@kafka.apache.org&q=subject:%22%5C%5Bjira%5C%5D+%5C%5BCommented%5C%5D+%5C%28KAFKA%5C-6762%5C%29+log%5C-cleaner+thread+terminates+due+to+java.lang.IllegalStateException%22&o=newest&f=1

JIRA about LogCleaner issue :

https://issues.apache.org/jira/browse/KAFKA-6854

PR that correct this issue :
https://github.com/apache/kafka/pull/4962

With this PR, the growBuffer() call does not use messagesRead anymore but use bytesRead, which solves our issue

So, it’s time to upgrade.

To which version ?

Best would be 2.2.1

https://www.apache.org/dist/kafka/2.2.1/RELEASE_NOTES.html

Mainly because it solves also the “non deletion of trx markers” problem:
[KAFKA-8335] — Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

Please any feedback is welcomed

--

--