Deleting records in Kafka (aka tombstones)

Damien Thomé Lutz
6 min readMay 3, 2020

--

In this blog post, I will thoroughly explain technical details about the process of deleting records in Kafka (aka tombstones).

For GDPR reasons and also in cases when inconsistent data is published to a topic, we may want to delete it.

Audience

This tutorial has been prepared for intermediate kafka users/engineers to help understand the basic concepts behind kafka to successfully delete records on topics which have the compact cleanup policy.

I recently started working with big Kafka topics and faced some new challenges. So i decided to share a bit of my knowledge.

Prerequisites

  • Kafka basic concepts (topic, record, distributed streaming platform)
  • be familiar and have hands-on experience with Kafka clients (producer, consumer, admin)

For a starting point i suggest the Introduction from the official documentation.

How do we delete?

A record with the same key from the record we want to delete is produced to the same topic and partition with a null payload. These records are called tombstones.

A null payload is a payload with 0 bytes. A possible mistake here would be to send a “null” string encoded to UTF-8. It will not be interpreted as a tombstone.

For example, if a customer requests his data to be deleted, a tombstone is pushed to the customer topic with the key from the customer.

This is all what will be necessary is most cases regarding to this data on this topic. Kafka will do all the rest. But in some cases, if it does not work, it may need to be necessary a deeper investigation.

What kafka does behind the scenes?

Kafka topics can be configured with the cleanup policy “compact“.

This tells kafka to delete old records with the same key and keep only the most recent one in a topic partition.

This detail of partition is really important to understand, because if for example a tombstone is produced to a different partition on the same topic, it won’t delete the record even though they have the same key.

For further details please check the official documentation.

The log compaction is not done immediately, and the time it takes to compact the log will depend on how the topic is configured, the cluster and the processing power of the cluster.

Simple scenario

Lets suppose the following:

  • we have an old record from a key and after 2 months a tombstone has been sent to this record
  • the old record has a payload
  • our consumer will start consuming the topic 25 hours after the tombstone has been sent
  • we want to make the old record to be deleted at most in 24 hours after a tombstone has been sent
  • the topic has 1000 different keys
  • more records are being produced to the topic (1 record / hour)
  • this is the only topic in the broker

On a practical way we can check that the old record has been compacted, if we consume the topic from beginning and we receive only once the record key.

For this scenario we should configure the following specific topic configs:

  • segment.ms: 86400000 (24 hours)

this makes sure that the log segment will roll and allow for the record to be compacted. Default is 7 days.

  • delete.retention.ms: 172800000 (48 hours)

this represents the time the tombstone will be kept on the topic. This should not be too short in order to the consumers be able to consume it and delete their internal records or any other appropriate action.

The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).

  • min.cleanable.dirty.ratio: 0.1
  • this config is optional for this scenario, but it helps accelerating the process of log compaction. We can try experimenting to decrease this number until log compaction is triggered. The default is 0.5. So if we don’t set it, on our scenario it will likely trigger compaction when 24 hours have elapsed.

This configuration controls how frequently the log compactor will attempt to clean the log (assuming log compaction is enabled). By default we will avoid cleaning a log where more than 50% of the log has been compacted. This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates). A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log. If the max.compaction.lag.ms or the min.compaction.lag.ms configurations are also specified, then the log compactor considers the log to be eligible for compaction as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the min.compaction.lag.ms duration, or (ii) if the log has had dirty (uncompacted) records for at most the max.compaction.lag.ms period.

And keep the topic defaults for:

  • retention.ms: 604800000 (7 days)

this property has no effect when retention policy is set to “compact“

  • segment.bytes: 1073741824

since we are using “segment.ms“, we don’t need the change this configuration, but in some cases this may be preferred over the timely one.

And set the cluster configs:

  • log.cleaner.max.compaction.lag.ms: 86400000

this tell kafka to compact the log from all topics on the maximum lag of 24 hours. Note that this configuration should not be used on production since it may demand a high processing load from the whole cluster. It is preferable to set the topic specific configuration “max.compaction.lag.ms“.

If on this scenario we still receive the key twice, there may be a reason why it has not been compacted:

  • the max compaction lag is not a hard guarantee. So the cluster will make its best effort to comply to it, but it will depend on the cluster performance/resources dedicated for the log cleaner.

The following documentation is also recommended:

More details from the scenario:

more records are being produced to the topic (1 record / hour)

This premise was mentioned to ensure the that the segment from the tombstone is rolled. If no records are written, kafka stops rolling segments and the last segment is the active segment, which is not compacted. For more details, it is suggested this article.

this is the only topic in the broker

This premise was mentioned to ensure the that no other topics can be prioritised by log cleaner. The filthiest is prioritised, according to the dirty ratio.

When delete is not necessary?

  • cleanup policy is set to delete with a short period
  • On some architectures the records have version header. This may be the case when implementing Kappa Architecture. On this case the consumers can filter the records by version. So it is not necessary to delete records from old versions.

Process for deleting records on big topics

If the topic is big you may want to run the process of sending tombstones in two steps, one as a dry run where you just save the ids that will be deleted. And the actual run were you will produce the tombstones. You can write these scripts using Kafkacat or Kafka Quickstart or Confluent Docker image. Also it’s recommended to use “tmux“ or any other console tool to ensure the process and session will keep alive on the process.

  1. Dry run.
  2. Compare the ids from the warnings and errors of the environment from your application that you will produce to.
  3. Actual run. Save the produced records to a file.
  4. Compare the results from the run with the warnings and error you observed on your environment.
  5. Check that the tombstones have been actually consumed on the appropriate applications and that they match the output from step 3.

Conclusion

This knowledge will help you delete records with confidence.

I took quite a good time to learn this technical details, hope with this post you learn faster!

Questions and feedbacks are welcome.

--

--