Retrying Apache Kafka Messages With A Dedicated Java Micro-service

Baron Chan
Ninja Van Tech
Published in
8 min readJun 2, 2021

How Ninja Van uses a dedicated micro-service to facilitate retrying of Kafka messages at scale.

Introduction

Failures from consuming Kafka messages are inevitable. Fortunately, Kafka handles most causes like network failures, uncommitted messages with its fault-tolerant architecture.

Unfortunately, errors can happen on the application side — things like database constraints being violated or dirty data could cause a message consumption to fail. These errors are terminal, and we should not bother retrying them as they are doomed to fail again.

Some errors are “fixable”. For example, a temporary outage of a dependency, or concurrency issues, can cause an error. These issues will be naturally resolved if we retry it at a later time.

If you are using Java with Apache Kafka, there are unfortunately no in-built solution to help you retry these failed messages with a delay. We had the pleasure of tackling this difficult problem, and will share our strategy in this article.

Our strategy builds upon a previous Medium post. Here, the author writes about implementing retry mechanism with delayed retry attempts, which we thought the approach could be improved. Their approach involves using a single retry queue, and blocking the thread until it is time to process the next message in line. We think this approach can be improved upon, as blocking the thread results in wasting of resources.

Assumptions

  1. You are using Apache Kafka and Java
    (if not, scroll down to the “Alternatives” section!)
  2. Your messages are not overly time-sensitive. This retry system uses a polling system to check for due retries. Furthermore, a delayed consumption of retry requests could cause additional delays. However, this delay is typically only a couple of seconds at most (depending on polling frequency).
  3. Your messages are idempotent. This retry system guarantees an at-least-once delivery, which means there’s a minute possibility of a duplicate retry being triggered.
  4. You have a way to persist the data in the Kafka messages (e.g storing it in a database either on your source application, or a dedicated micro-service just for retries).

Strategy

Key Idea

The key idea for this strategy is to persist retry requests somewhere, then regularly poll them to trigger those that are due. In our case, we will be using a database for this purpose.

Overall Flow

  1. Original message gets consumed in original_topic which results in an exception
  2. We wrap the original message into a retry_wrapper, and specify retry_queue as [00H01M, 00H05M], which means it will retry at T = 1m, then another time at T = 6m
  3. We dequeue the first element of retry_queue to get 00H01M, so we publish it to retry_00H01M_topic
  4. We consume the retry_wrapper from retry_00H01M_topic; we calculate the retry_at field by adding 1 minute to the current time (since it was consumed from 00H01M), then store this in the database
  5. Roughly a minute later, a regularly-running CRON job triggers and we query our database for due requests; we find that the request from (4) is due, so we publish the entire retry_wrapper to the target_topic
  6. We consume the retry_wrapper from target_topic, and try to process it
    - If the processing fails and the retry_queue is empty, it means all retry attempts were exhausted; we can publish it to a dlq_topic
    - If the processing fails but the retry_queue is not empty, return to step 3

Note that the processing logic for target_topic would be the same as the one for original_topic. The only difference is that target_topic takes in a retry_wrapper, because it needs the retry_queue field in it to know how many more times to retry.

Wrapper Proto

We use a wrapper protobuf to encapsulate the original message, and also all other metadata needed to facilitate the retry

This message will be published to the retry topics, and then later back to the target topics after the specified delay in retry_queue.

Do note that the wrapper introduces some overhead to the final protobuf’s size, but its quite minimal. Just ensure that your original_message protobuf isn’t too close to the maximum protobuf size.

message RetryWrapper {
// Retry delay strategy
// e.g. [00H01M, 00H01M, 00H05M, 00H30M, 01H00M]
repeated RetryOption retry_queue = 1;

// original message in bytes
bytes original_message = 2;

// request id of the original request
string original_request_id = 3;

// system id of the original request
string original_system_id = 4;

// topic name to publish to during retry
string target_topic_name = 5;

enum RetryOption {
DEFAULT_RETRY_OPTION = 0;
_00H01M = 1;
_00H05M = 2;
_00H30M = 3;
_01H00M = 4;
}
}

Database Storage

This table is used to persist the retry attempts. We regularly poll it for unprocessed requests that are due, publish them to their target_topic, then mark them as processed.

Database Schema for retry_requests

CREATE TABLE `retry_jobs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
/* Time which we should trigger the retry */
`retry_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
/* Boolean to denote if request has been processed */
`processed` bit(1) DEFAULT NULL,
/* Time of which request was processed */
`processed_at` timestamp NULL DEFAULT NULL,
/* Original message and target topic in bytes */
`retry_wrapper` text,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Logic to SELECT Due Requests from DB

SELECT * FROM retry_jobs
WHERE processed = 0
AND retry_at <= NOW();

Retry (retry_X_topic) Topic Pseudocode

The delayMinutes is dependent on which topic this was consumed from. (e.g If it was consumed from retry_00H01M_topic, then the delayMinutes is 1. If it was consumed from retry_00H30M_topic, then the delayMinutes is 30)

We simply set the retryAt to the calculated time, then persist it in the database.

private void handleIncomingRetryRequest(RetryWrapper retryWrapper, int delayMinutes) {
Date retryAt = new Date().addMinutes(delayMinutes);
retryWrapper.retryAt = retryAt;
database.persist(retryWrapper);
}

Retry CRON Job Pseudocode

This CRON job will run regularly (e.g every 10 seconds). It queries all due retries from the database, publishes them to their respective target topics, marks them as processed, then persists the changes.

private void triggerDueRetries() {
List<RetryWrapper> dueRetries = database.queryAllDueRetries();
for(RetryWrapper retryWrapper : dueRetries) {

// Get targetTopic and publish it there
String targetTopic = retryWrapper.targetTopic;
publish(retryWrapper, targetTopic);

// Update retryWrapper and persist changes
retryWrapper.processed = true;
retryWrapper.processedAt = new Date();
database.update(retryWrapper);
}
}

Original Message/Topic Logic Pseudocode

Upon an exception on processing the original message, we wrap the original message into a retry_wrapper, and specify the retry strategy using the retry_queue field.

We then pop the first element in the retry_queue to figure out which retry topic (retry_00H01M_topic) to publish to.

private void processOriginalMessage(Message originalMessage) {
try {
process(originalMessage);
} catch (Exception e) {
List<RetryOption> retryQueue = Arrays.asList(
RetryOption._00H01M,
RetryOption._00H05M,
RetryOption._00H30M
);
RetryWrapper retryWrapper = convertToRetryWrapper(originalMessage, retryQueue);
RetryOption nextOption = popNextRetryOption(retryWrapper);
switch(nextOption) {
case RetryOption._00H01M:
publishTo00H01M(retryWrapper);
break;
case RetryOption._00H05M:
publishTo00H05M(retryWrapper);
break;
case RetryOption._00H30M:
default:
publishTo00H30M(retryWrapper);
break;
}
}
}

Target Topic Pseudocode

We extract the original message from the retry_wrapper, and process it. If the processing results in an error:

If the retry_queue is empty, it means all retries have been exhausted — publish it to the DLQ.

Else pop the first element of the retry_queue to figure out which retry topic to publish to.

private void processRetryMessage(RetryWrapper retryWrapper) {
Message originalMessage = retryWrapper.originalMessage;
try {
process(originalMessage);
} catch (Exception e) {
if (isRetryQueueEmpty(retryWrapper)) {
publishToDlq(originalMessage);
return;
}
RetryOption nextOption = popNextRetryOption(retryWrapper);
switch(nextOption) {
case RetryOption._00H01M:
publishTo00H01M(retryWrapper);
break;
case RetryOption._00H05M:
publishTo00H05M(retryWrapper);
break;
case RetryOption._00H30M:
default:
publishTo00H30M(retryWrapper);
break;
}
}
}

Using a Dedicated Micro-service for Retries

portion that can be segregated into a separate micro-service

Handling the logic for consuming from the retry topics require additional code and the usage of a database. You may not want them on all your different micro-services, as you’d have to introduce a retry_request table to all of them, and write the same processing logic code for all of them.

At Ninja Van, we isolated these logic to a dedicated micro-service called Retry-Service. This micro-service has these really simple tasks:

  1. Consume and process messages from the various retry topics (e.g retry_00H01M_topic).
  2. Persist the retry_wrapper in its own database.
  3. Regularly query for due retry_wrappers, publish them into their respective target topics, and mark them as processed.

This is highly recommended as it absolves your other micro-services from incurring the additional overheads from extra code, database operations, and CRON jobs.

Example Scenario

  1. Original message gets consumed by client service via original topic
original message gets consumed by your micro-service

2. Client service runs into exception while processing message;
We wrap it up into a retry-wrapper and publish to retry-service’s topic (e.g retry_00H01M_topic)

original message gets wrapped and sent to retry-service

3. Retry-service persists the retry_wrapper in its database (retry_request table) and waits for the correct moment to fire the retry

retry-service stores request in database

4. Time’s up! Retry-service has waited for the correct moment and publishes the retry-wrapper to the client service’s target topic (their own retry topic)

retry-service fires due retry_wrapper to target topic

5. If the attempted retry fails, but there are still more pending attempts; repeat from step 2 onwards

retry attempt fails, push it again to retry-service

6. If all retry attempts have been exhausted, and a DLQ topic is specified, publish the message directly to the DLQ topic

if all retry attempts failed, instead push to DLQ topic

Benefits

  1. Non-blocking implementation — we do not force any consumer threads to sleep while waiting for the next retry to be due, preventing the possibility of the Kafka broker assuming the consumer died (when it was just waiting).
  2. Observability of retry attempts — retry requests are persisted in a database, allowing us to easily gauge the frequency. Since the topics and metadata (e.g request ID) are also stored, you can also debug easily.
  3. Ordering of messages are still maintained, as we can query and publish the retry requests in the order of retry_at.

Drawbacks

  1. You need a database to persist retries, and incur read/write costs from using it.
  2. You have to introduce multiple additional topics just for retrying purposes.

Alternatives

If your architecture isn’t heavily entrenched in using Java, you could explore using Golang. You can consider the Watermill library, which has in-built functionality to let you specify delayed retries with exponential backoffs.

If your architecture isn’t already on Apache Kafka, you can ActiveMQ, which has in-built functionality to efficiently schedule or delay message delivery.

Conclusion

Although implementing this retry strategy involves additional parts to your stack, it is a reliable way to introduce non-blocking retries. Although it brings some drawbacks, there are definite benefits — the most obvious one being non-blocking.

p.s Ninja Van is hiring! Interested in working on interesting side projects like these? If you’re from Singapore, Indonesia, or Vietnam, or are willing to relocate here, you can apply via our Lever portal!

p.p.s Thank you Timothy and Thung Han for the immense help with this project!

--

--