Key Concepts You Need to Know about Message Retention, Expiry, and Deletion in Apache Pulsar

Sherlock Xu
6 min readSep 19, 2022

--

In one of my previous blog posts, I introduced how cursors work in Apache Pulsar and briefly talked about TTL. Pulsar retains all unacknowledged messages by default, while you can set a TTL policy to let Pulsar automatically acknowledge them once the configured threshold is reached. This also moves the cursor accordingly. In this post, I would like to extend the topic to explain some basic Pulsar concepts in message retention, expiry, and deletion.

Backlogs

A backlog refers to a collection of unacknowledged messages in a subscription. When a message is acknowledged, it is removed from the backlog. As a single topic can have multiple subscriptions with each using its own backlog to manage unacknowledged messages, it is possible that an individual topic has multiple backlogs.

It is difficult to say something against this mechanism of storing unacknowledged messages by default. After all, ideally, we would like to see that all messages are successfully consumed. However, if the message consumption rate is too slow due to some errors on the consumer side, we have to take some measures in case the consumer is lagging far behind the producer. Also, we need to prevent unacknowledged messages from growing indefinitely. In this connection, Pulsar provides great flexibility as it allows users to configure backlog quotas at the namespace level to customize the behavior of unacknowledged messages (TTL is another way, which will be explained later).

Backlog quotas control the size and time of unacknowledged messages retained. After the configured threshold is reached, the broker can take the corresponding action based on one of the following strategies.

  • producer_request_hold: The default message retention policy of backlog quotas. Using this policy makes the broker reject all new messages from the producer. The producer will keep the rejected messages and resend them later. Note that the "retention policy" here is only applicable to unacknowledged messages, while the "retention policy" I talk about later is another concept that is applicable to acknowledged messages.
  • producer_exception: The broker disconnects all producers forcibly by throwing an exception.
  • consumer_backlog_eviction: The broker discards the oldest unacknowledged messages in the backlog of the slowest subscriber.

You can configure backlog quotas through the following command.

./pulsar-admin namespaces set-backlog-quota tenant/namespace \
--limit 3G \
--limitTime 3600 \
--policy producer_request_hold

Keep in mind that once you have configured a backlog quota policy for a namespace, it applies to all topics in the namespace.

Time to live (TTL)

In addition to backlog quotas, you can also use TTL to manage unacknowledged messages in a backlog. A TTL policy determines how long messages can stay in the unacknowledged state. The messages will be forcibly acknowledged after the TTL policy expires with the cursor moved accordingly.

You can configure both backlog quotas and TTL policies at the same time. Use the following command to set a TTL policy at the topic level:

./pulsar-admin topics set-message-ttl -t 150 persistent://tenant/namespace/topic

To set a TTL policy at the namespace level:

./pulsar-admin namespaces set-message-ttl -t 150 persistent://tenant/namespace

To set a TTL policy at the broker level in broker.conf:

# Default ttl for namespaces if a namespace-level ttl policy is not configured. Disable the default behavior by setting it to 0.
ttlDurationDefaultInSeconds=0

We can see that both backlogs and TTL can be used to control the number of unacknowledged messages and forcibly discard or acknowledge them once the preset thresholds are exceeded. I reckon the reason behind the design logic is that consumers don’t need to receive all the messages in some cases as the latest data may be more relevant. In reality, this is true for some location services and apps. Users only need to know the latest location information while the previous location data are less important.

Retention policies

After all subscribers of a topic have confirmed the successful consumption of messages, we don’t need to retain these messages anymore. Pulsar decides whether messages should be deleted based on retention policies. Different from backlogs and TTL, retention policies are applicable to acknowledged messages. You can configure the following two properties (both default to 0, meaning no acknowledged messages are retained) in broker.conf to set broker-level retention policies.

  • defaultRetentionTimeInMinutes: The default message retention time measured in minutes, namely the time threshold for messages to remain acknowledged before the retention policy is triggered.
  • defaultRetentionSizeInMB: The default message retention size measured in MB, namely the size threshold for acknowledged messages before the retention policy is triggered.

You can also configure a namespace-level retention policy so that all topics in the namespace can be controlled by it. Use the following command:

./pulsar-admin namespaces set-retention tenant/namespace \
--size 7G \
--time 2h

Whether you set the retention by time or by size, the calculation starts only after messages are acknowledged. If either threshold is reached, it means messages are ready for deletion (whichever is reached first). To store messages indefinitely, set both parameters to -1. If one of them is -1 and the other one is greater than 0, the latter is used to determine whether messages should be deleted.

Acknowledged messages that are being controlled by a retention policy can still be accessed by readers. Readers are a special type of consumers that use non-persistent subscriptions. When creating readers, you must specify a position on a topic where the consumption starts. It can be the oldest/latest available message, or a message between them (the message ID is required). For more information about readers, see the Pulsar documentation.

Message deletion

In my previous blog post about cursors, I mentioned that a cursor has an attribute markDeletePosition, which tells you what messages have been acknowledged so that they can be deleted. Keep in mind that the attribute serves as a signal and that it does not mean these ready-to-be-deleted messages are deleted immediately. I list some scenarios below where acknowledged messages cannot be deleted.

  • Not all messages are acknowledged by subscribers on a topic. A single topic may have multiple subscriptions. You can consider each subscription as a consumer group, the consumption progress of which is tracked by its respective cursor. This means it is possible that for a subscription, a message is considered acknowledged while it is not for consumers in other groups (subscriptions). Obviously, these messages should not be deleted in this case.
  • A retention policy is configured and applied to some messages. Pulsar does not delete these messages before the configured threshold is reached. See the above sections for details.
  • In a ledger, some messages are acknowledged while others are not. In my BookKeeper introduction blog post, I explained that ledgers are the smallest unit for deletion. If you want to delete ledger data, you can only delete the entire ledger instead of individual entries within the ledger. When you use Pulsar and BookKeeper, you can notice that sometimes the consumer just can’t acknowledge messages for some reason, which results in acknowledgment holes. In this case, the corresponding ledger cannot be deleted.

Here, I would like to explain a little more about ledger rollover parameters, as I noticed some community members encountered a problem when configuring them. When a ledger rollover happens, the ledger turns from the OPEN state to the CLOSED state with a new ledger created. As the original ledger is closed, it can be deleted.

A ledger rollover takes place after the minimum rollover time (controlled by managedLedgerMinLedgerRolloverTimeMinutes in broker.conf) is reached and one of the following conditions is met.

  1. managedLedgerMaxLedgerRolloverTimeMinutes: The maximum rollover time, which defaults to 240.
  2. managedLedgerMaxEntriesPerLedger: The maximum number of entries written to a ledger.
  3. managedLedgerMaxSizePerLedgerMbytes: The maximum size of entries written to a ledger.

You have to be extremely careful when setting the above values or you may have problems after a ledger is rolled over. For example, before the minimum rollover time is reached, you may have a ledger where the allowed entry number or size has already exceeded the threshold. In this case, as the ledger cannot be deleted, it can keep growing and become excessively large.

Garbage collection threads are responsible for the physical removal of messages from disks. They scan the entry log files in BookKeeper and follow a compaction-based strategy to delete data. I will not elaborate on it in this article as this requires a separate blog post to explain the logic.

Summary

This article walks you through some basics in message retention and expiry in Pulsar. When I was learning Pulsar (and I am still learning it 😊), I found they were some of the key building blocks behind Pulsar’s logic of processing messages. I would recommend this blog post to anyone who is new to Pulsar or wants to have a try. I hope this article can help you gain a better understanding of these concepts as you continue or start your Pulsar journey.

Reference

Message retention and expiry
Reader interface
Broker configurations

--

--