Sometimes you want to delay the delivery of your messages so that subscribers don’t see them immediately. RabbitMQ has a plugin for this while
ActiveMQ has this feature built into the message broker.
The usage of Apache Kafka is growing tremendously due to its unique design and high performance but it lacks the support for delay queues and dead letter queues but this does not mean you can’t implement it yourself.
Why do we need a Delay Queue?
There are many use cases for delaying message consumption but I would like
to stress on the most important one :-
Use Case: Retry message consumption in the case of processing
Kafka provides consumer apis to poll messages from the broker that can be
processed accordingly. It might happen that your message processing fails
due to unavailability or errors in the dependent system. If it is a recoverable
failure you would want a retry mechanism to reprocess that message. In the
case of an unrecoverable failure you can just dump the message to a permanent failure topic which can then be consumed to report such errors.
Let’s see how we can implement retry mechanism in Kafka!
Strategy and Implementation
Simple retry logic
In Kafka you can not skip a message and come back later. So a simple retry
logic would be to wait for some time in catch block and reprocess that message. Delay introduced will allow the application to heal and recover from
its failure state.
What can we do to improve this naive implementation?
Non blocking retry logic
A better approach would be to push the failed message to a delay topic if the
number of retries is not exhausted and continuing your message consumption. The consumer of the delay topic will publish the message back to the application topic (the topic whose message failed) after a certain delay.
Note: Add metadata like retry count, application topic and failed consumer
group id to the message. This will help us in identifying whether we should
retry the message or not.
To better understand why metadata is required let’s assume the message
failed. If the retry count extracted from the metadata is greater than zero, then
we should retry message processing otherwise we should just dump the
message to a permanent failure topic since the number of retries for this
Now let’s assume the message comes back to the application topic. If failed consumer group id extracted from the metadata is equal to the consumer group id of the message then only we should reprocess the message otherwise we should discard the message. The reason for that is since every time the message comes back from the delay topic to the application topic it will be propagated to all its consumer groups and we don’t need to reprocess the message in case some other consumer group failed it. Application topic is required by the consumer of delay topic to route the message back to the application topic.
Repeat the following steps indefinitely
- poll message from application topic
- pre-process message
2.1 if (failed consumer group id metadata exists and it does not match the consumer group id of the message) then discard the message
3. process message (as per your logic)
3.1 in the case of exception
3.1.1 if (retry count metadata is greater than 0) then decrement retry count, add failed consumer group id and send that message to delay topic else send that message to permanent failure topic
Delay queue logic
Delay queue comprises of two elements delay topic and its consumer. To put
it simply, consumer of delay topic (pattern: delay-minutes-x, where x is any
number) reads messages from delay topic, then introduce a delay of x
minutes, then it publishes the message back to the application topic.
Now the question arises… How will you introduce this delay?
It’s simple! By sleeping the consumer thread till resume time (time when the
message was published to the application topic + delay time, where the former can be found by consumer record timestamp and the latter can be parsed from the topic).
This approach will fail… Why?
Because you cannot sleep the consumer thread while processing for more
than a given time as kafka will assume that it is dead. Moreover kafka will
perform partition reassignment and pass that message to other consumers.
More specifically: When you poll kafka for consuming messages it returns a
batch of messages, specified by max.poll.records configuration. You can’t
process the message indefinitely. If you are unable to process those messages in the desired time, specified by max.poll.interval.ms configuration, your consumer will be considered dead. In other words this config sets the maximum delay between client calls to poll.
To better understand let’s assume that the delay topic provides a delay of 10
minutes. Also assume that there are 2 consumers that are consuming from
delay topic. Consumer 1 consumes the message and sleeps for 10 mins (which is greater than max.poll.interval.ms). Broker will assume that it is dead and provide the message to consumer 2. Consumer 2 will consume and sleep for 10 mins (which is greater than max.poll.interval.ms). Broker will think consumer 2 is also dead. Now broker will wait until any consumer is alive again. That is not what we intended.
We need another approach.
Another approach will be to leverage the methods pause, resume and paused
provided in the KafkaConsumer api. First, poll message from delay topic.
Next, pause the consumer till resume time. Finally, resume the consumer and
send the message back to application topic.
Note: You will not receive any messages from poll operation if the consumer
is paused. Resuming the consumer will resume fetching of messages. Also
ordering of messages is critical so route all messages to a single partition by
using same routing key.
In this approach message processing is not halted instead consumer is
paused and poll operation is called again. Since poll operation is called again
within the specified time (specified by max.poll.interval.ms configuration), the consumer retains its liveliness.
Repeat the following steps indefinitely
- poll message from delay topic
- read message
2.1 read consumer timestamp
2.2 calculate resume time (consumer record timestamp + delay time)
2.3 if (current time < resume) time then pause consumer if not
2.3.1 resume consumer if paused or
2.3.2 push that message back to application topic and commit
the message if not paused
Flexible non blocking retry logic
Create multiple delay topics (delay-minutes-5, delay-minutes-10, delay-minutes-15) to provide exponential delay time. This will be better than using just one delay topic as it will provide more graceful reprocessing.
Hit the 👏 (claps) button to make it reachable for more audience.