A Kafka pitfall when to set Log.Message.Timestamp.Type to CreateTime

Jiangtao Liu
7 min readApr 10, 2020

--

Apache Kafka is a very popular distributed messaging system, and it’s now used by one-third of the Fortune 500, including seven of the top 10 global banks.

As we have known, Apache Kafka is adopted as a scalable, event-driven backbone for service architectures. Therefore, any pitfall in Kafka will cause a serious issue.

From the system design perspective, any pitfall will indicate a system design issue. On this blog, I will share a Kafka pitfall when to choose CreateTime as Log.Message.Timestamp.Type. It includes what’s the issue and impacts, how to troubleshoot, where’s the design issue, and how to work around it.

You can choose to read directly that part interests you if reading the entire blog takes too much time.

What’s The Issue? Why Serious?

A short summary of the issue is as follows:

The timestamp-based retention policy was suddenly not effective on a Kafka cluster, which has been working well for two years.

The predictable impact is:

There will be a service outage in Apache Kafka if there is no space available to write data.

After figuring out the root cause, I realized there are more impacts other than space shortage. I have outlined them in the section “Design review on Kafka timestamp-based retention policy”.

Immediate Operation

We did a quick check on the Operation System, Network Traffic, and Disk i/o. However, there is no clue what’s going wrong. So the immediate operation was imposed to increase space, then we can prevent Kafka from service outage.

In the meantime, we can spend time on troubleshooting.

The 1st Round of Troubleshooting: Explore and Narrow Down the Causes

From my experience, there are several types of reasons causing a space shortage. As usual, I’d like to write them down, then work on each of them and figure out what’s the most possible cause.

Here are my lists:

  1. Was there more traffic on Kafka in recent times? Answer: []Yes, [x]No.
  2. Were there any hidden Operation System issues? Like exhausted available Inodes, Running out of Open File Handlers, or Temporary File Space. Answer: []Yes, [x]No.
  3. Which Kafka topics were taking more space than usual? Answer: []All Topics, []A Few Topics, [x]An Individual Topic.
  4. When did that topic begin to take more space? Answer: []1 Week Ago, [x]2 Weeks Ago, []1 Month Ago.
  5. Are there any reported issues similar to my case from Apache Kafka? Answer: []Yes, [x]No.
  6. Could we free topic space by adjusting timestamp-based retention hours? Answer: []Yes [x]No.

After working on each item, I am contemplating what’s the cause:

The timestamp-based retention issue may be caused by topic data from Kafka clients.

There is a particular source code in Apache Kafka: Log.scala used to prove my prediction.

In order to understand the following code, you may need to have some background of how Kafka storage work, and how Kafka offset and time index work.

/*** Given a few of demoed segment data to explain the logic.* Now : 01/01/2020 08:10:20PST* Demoed segments:* 00000000035952755817.index * 00000000035952755817.log // The first segment* 00000000035952755817.timeindex 12/15/2019 10:23:30PST* 00000000035957300794.index * 00000000035957300794.log // The second segment* 00000000035957300794.timeindex 12/15/2019 23:10:09PST* 00000000035957438212.index* 00000000035957438212.log* 00000000035957438212.timeindex* 00000000035961948428.index* 00000000035961948428.log* 00000000035961948428.snapshot* 00000000035961948428.timeindex* 00000000035961969044.index* 00000000035961969044.log // The active segment* 00000000035961969044.snapshot* 00000000035961969044.timeindex 01/01/2020 00:09:40PST** Given a number for high water mark:* highWatermark = 35961979044*/private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {if (segments.isEmpty || replicaHighWatermark.isEmpty) {Seq.empty} else {//Analysis:: with sample data, highWatermark = 35961979044val highWatermark = replicaHighWatermark.getval deletable = ArrayBuffer.empty[LogSegment]var segmentEntry = segments.firstEntrywhile (segmentEntry != null) {// Analysis:: segment=00000000035952755817.*val segment = segmentEntry.getValue// Analysis:: nextSegmentEntry=00000000035957300794.*val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)// Analysis:: nextSegment=00000000035957300794.*, upperBoundOffset=35957300794, isLastSegmentAndEmpty=falseval (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)(nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)else(null, logEndOffset, segment.size == 0)// Analysis:://           True = highWatermark >= upperBoundOffset//           True = !isLastSegmentAndEmpty// After checking the conditions, there should be 
// False returned from predicate(segment, Option(nextSegment))
if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {deletable += segmentsegmentEntry = nextSegmentEntry} else {segmentEntry = null}}deletable}}>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>// For predicate(segment, Option(nextSegment)),
// here is the specific code:
(segment, _) => startMs - segment.largestTimestamp > config.retentionMs// Analysis::// segment=00000000035952755817.timeindex 12/15/2019 10:23:30PST // startMs=now.timestamp 01/01/2020 08:10:20PST// segment.largestTimestamp= (12/15/2019 10:23:30PST < ? < 12/15/2019 23:10:09PST)// config.retentionsMs=12 hours

With the given data, the first segment should be deletable since it’s already exceeding the max retention hours (12 hours). However, it’s still there, so the only possibility is that the largest timestamp in the first segment is greater than now (01/01/2020 08:10:20PST).

The 2nd Round of Troubleshooting: Explore Log Segment Data

Kafkacat is a handy tool to troubleshoot data issues. I will show you what I did.

Command Line:

kafkacat -C -b $KAFKA_BROKERS -t $KAFKA_TOPIC -p 0 -c 1 -o 35952755817 -f ‘\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n — \n’

Output:

Key (6 bytes): 535317
Value (284 bytes): ??????????? 1741263292 source=178.199.XX.XX ????????????????????????????????????????????
Timestamp:1741263292443 Partition: 0 Offset: 35952755817

Output Explains:

  1. 178.199.XX.XX is a Kafka client’s host address. In our message payload, there is a source field to identify Kafka client.
  2. 1741130752 is a timestamp in seconds, it can be translated to Tuesday, March 4, 2025, 3:25:52 PM GMT-08:00.
  3. 1741263292443 is a timestamp in milliseconds.
  4. 35952755817 is the offset in the Kafka log.

The 3rd Round of Troubleshooting: Explore Log TimeIndex

From Kafkacat’s output, we captured the dirty data. As we have known, Kafka message’s timestamp is also used to create a log time index. We can take Kafka internal dump tool to visualize the time index.

Command Line:

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files <logdir>/<topic name>-0/00000000035952755817.timeindex

Output:

timestamp: 1741263292443 offset: 35952755817timestamp: 1741263352442 offset: 35952806331timestamp: 1741263412443 offset: 35952864581timestamp: 1741263472442 offset: 35952916568timestamp: 1741263532442 offset: 35952973711timestamp: 1741263652442 offset: 35953082472timestamp: 1741263712443 offset: 35953131477timestamp: 1741263772442 offset: 35953188706timestamp: 1741263832443 offset: 35953239456timestamp: 1741263892442 offset: 35953298105….

Output Explains:

  1. All of the timestamps are invalid. I will explain why in section “Design pitfall and improvement on Kafka time index”.
  2. The relative log segment can never be deleted because of the largest timestamp is always greater than now.

What’s the final solution I have ever imposed to resolve this issue?

Since we are clear why we run into the storage issue. The relative solution is like this:

  1. Fix system clock issue on Kafka client hosted at 10.199.XX.XX.
  2. Switch to size-based retention policy to clear dirty data. (CAUTION, please pay attention to size to prevent good data from being deleted.)
  3. Switch back to the timestamp-based retention policy.
  4. Adjust log.message.timestamp.type from CreateTime to LogAppendTime. (NOTE, you need to make the tradeoff to apply LogAppendTime on individual Kafka topic or Kafka broker.)

I put the order in purpose. And we have to follow the order, otherwise we can not resolve the issue. There is a further design review to cover why I did that.

Design review on Kafka timestamp-based retention policy

In my previous explanation, I mentioned “Even 1 dirty timestamp will mess up the whole time index, then make the timestamp-based retention policy never work again”. That’s because of Kafka time index’s design, let me show you:

  • If we choose `CreateTime` for `message.timestamp.type`, then the timestamp in time index will come from Kafka client’s request time.
  • If we choose `CreateTime` for `message.timestamp.type`, the next time index entry needs to make sure its timestamp is greater than the previous one, otherwise, ignore.
  • If we choose `CreateTime` for `message.timestamp.type`, the new time index entry will also be generated based on the previous timestamp and scheduler, that’s why we see the whole time index file is fully messed up even 1 dirty future data.
  • Timestamp-based retention policy won’t delete any rest of segments if the earliest one is not deleted, that ’s why the topic’s storage keep in increasing.

Let’s summarize the visible and invisible impacts based on the Kafka time index’s design.

Impacts on Kafka broker:

  • There is no way to free up storage since the segment removal condition is always False.
  • There are no metrics to monitor dirty timestamp until we receive an alert to warn short of storage.
  • There is no configuration from Kafka to reject data with a dirty timestamp.
  • It may be not that easy to stop Kafka’s request including dirty timestamp if we can not identify Kafka client.
  • We may be not aware of Kafka’s request including a dirty timestamp when it’s only a few hours of future time, the time-based retention policy will still work after a few hours of delay, but any search function based on timestamp will miss a few hours good data.

Impacts on Kafka client:

  • Kafka streaming has a feature to consume message by order by timestamp, that will cause missing data.
  • Kafka client may have some function to query data by timestamp, which will cause missing expected data.

Design pitfall and Improvement on Kafka time index

It’s a design pitfall since Kafka’s retention policy is built based on Kafka client and Kafka broker time. It turns out Kafka client time is not reliable.

For design improvement. Instead, we should treat Log Retention as System Function. And the specific implementation should be fully decoupled from Kafka client logic and its data.

The following diagram shows how the log and time index’s timestamp is generated. And shows what kinds of Kafka system and user functions are built with timestamp:

As you see, the log/time index's timestamp is critical for Kafka system and user functions. If you don't have confidence with Kafka client's time, You may need to set LogAppendTime instead of CreateTime as Log.Message.Timestamp.Type.

Conclusion

First, I’d like to thank you for taking a moment to read my blog. I hope this blog provides some useful information that will help you solve some problem you may be having on your Kafka system.

Apache Kafka is my favorite open source project. There are lots of genius ideas we can learn from. And it can help us learn how to build powerful distributed system.

I have to give credits to any Apache Kafka contributor to build so amazing distributed message streaming platform. Meanwhile, I will try my best to share Kafka experience what I have instead of only taking credits from Apache Kafka. That’s my inspiration for writing this blog.

--

--