Retrying Kafka Streams messages with KV Stores

Timothy Ong
Ninja Van Tech
Published in
5 min readJul 13, 2021

At Ninja Van, we use Kafka streams extensively; from using it to process order requests, to status changes. We encounter errors due to a variety of reasons, all of which require retrying Kafka streams messages in a timely manner.

In this post, I’ll describe some of the designs that we tried, tested (and failed at), before settling on a design using Kafka's key-value stores.

If at first you don’t succeed, retry

We started out by writing all messages that failed into a database table.

CREATE TABLE `retries` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`created_at` datetime DEFAULT NULL,
`deleted_at` datetime DEFAULT NULL,
`updated_at` datetime DEFAULT NULL,
`request_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`error` longtext COLLATE utf8mb4_unicode_ci,
`proto_request` longtext COLLATE utf8mb4_unicode_ci,
`processed` bit(1) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `order_create_dlq_errors_processed_index` (`processed`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

request_id a UUID that ties back to the original request; useful for when attempting to check our logs for when / how the error occurred

proto_request the JSON representation of the protobuf Kafka message. All our Kafka messages are published using protobufs.

processed if the message has been successfully processed

Pulling Kafka messages to retry

The retrying of the Kafka messages was done on a topic specifically created for the service to ensure that a separate Kafka thread pool would be used.

This design held up for about 1 month before scaling issues started to show.

When an outage occurred, the service would have a hard time keeping up as only a single instance of the service could handle the re-publishing of the messages at any one time. When the database started returning over a million messages to retry, the service would get overwhelmed and slow to a crawl.

Back to the drawing board

With scale in mind, we started to look for new ways of handling retries. The new design had to meet the following goals

  • Self contained within Kafka Streams
  • Scalability
  • Able to specify exponential backoff with jitter for when a message should be retried
  • Not require any sort of service specific code (to allow us to use the same code in a library across different services)
  • Data storage has to be fault tolerant

We eventually settled on KV state stores that Kafka provides for interactive queries.

High level overview of how the KV state store would be used

Setting up the store

Kafka state stores provide an in-memory Hashmap that allows us to store key-value entries. There are many different implementations that can used to back the state store, we settled on the in-memory hash map to reduce the number of dependencies required.

This in-memory hash map would be backed by a Kafka topic, and would need to be tied to a Kafka processor.

StoreBuilder<KeyValueStore<String, byte[]>> retryBuilder =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("inmemory-order-create-retry"),
Serdes.String(),
Serdes.ByteArray());

topology.addStateStore(
retryOrderBuilder,
processor("inmemory-order-create-retry")
);

This will create an internal topic with the suffix -changelog to allow the in-memory state store to be fault tolerant.

You can check this via the Kafka command kafka-topics.

~ kafka-topics --zookeeper localhost:2181 --list | grep changelog

The changelog functions similarly to how a topic does; each partition in the changelog is assigned to a Kafka consumer. This will allow us to scale the consumption by scaling up the number of Kafka consumers when there’s a spike in retry messages.

Putting it all together

Next up, setting up the Kafka streams processor. This processor has to be the same processor that you associated with the state store in the section above.

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext ctx) {
super.init(context);

kvStore = (KeyValueStore) context.getStateStore("inmemory-order-create-retry");

ctx.schedule(100, PunctuationType.WALL_CLOCK_TIME, (ts) -> {
// Processing code goes here
});
}

This sets up the key-value store for use during the processing of the message. More on the context.schedule later.

@Override
public void process(String key, byte[] value) {
OrderRequestProto.OrderRequest request;
try {
request = OrderRequest.parseFrom(value);
RequestInfo requestInfo = request.getRequestInfo();

// Add in exponential backoff + jitter here

String storeKey = String.valueOf(ZonedDateTime.now().toInstant().toEpochMilli()) + "-" + UUID.randomUUID().toString();

kvStore.put(storeKey, value);
} catch (Exception e) {
Log.error("Error", e);
}
}

Before we store the message into the state store, we’ll need to define the exponential backoff and jitter. We went with something along the lines of the following, but you can use anything that you want.

Math.exp(Math.log(((double) RETRY_PERIOD / INITIAL_DELAY) + 1) / RETRY_TIMES);

This will ensure that retries do not take place all at once; this doesn’t affect one of failures but during system wide outages, it’ll ensure that systems don't get overloaded when they recover.

Punctuate

Coming back to context.schedule , this is where the code for processing messages to be retried will go. A basic skeleton would look like this.

ctx.schedule(100, PunctuationType.WALL_CLOCK_TIME, (ts) -> {
try {
KeyValueIterator<String, byte[]> iter = kvStore.all();
while (iter.hasNext()) {
KeyValue<String, byte[]> entry = iter.next();

// Any checks on when to process the message
// (e.g. due to backoff + jitter)
// should be done here

// You can do anything here, process the message etc
// For us, we're forwarding the message
context.forward(null, entry.value, 0);
kvStore.delete(entry.key);
}
iter.close();

context.commit();
} catch (Exception e) {
Log.error("Error", e);
}
});
}

The context.schedule will run the code block based on 2 parameters

  • Time in milliseconds
  • Punctuation type

PunctuationType denotes how the schedule run times are calculated. In the example, we’re using wall clock time; this means that every 100ms that passes, the code block will be executed. More information about other punctuation types can be found here.

Downsides

While using the Kafka state stores meet all our requirements, there are a few downsides out of the box.

  • No easy way to switch off the Kafka state store if we need to. An example would be when you’ll want to redirect all failed messages into a DLQ, rather than attempting to retry them.
  • Changelog topics cannot be monitored by default jmx metrics (topic lag will always be at 0)
  • Increasing number of partitions on the changelog is complicated

We eventually added a way to drain the Kafka state store data into a DLQ, and added custom jmx metrics to monitor the changelog topic on Grafana.

As for partition management on the changelog topic, it required a multi-step process of draining the changelog, re-creating a new changelog and pointing the consumer to it. Do-able, but complicated to do in a production environment.

Wrap Up

We’ve rolled this out across multiple services, and even built a dedicated retry service based off this implementation.

We hope this has helped you with implementing your own retry implementation via Kafka streams.

p.s Ninja Van is hiring! Interested in working on interesting side projects like these? If you’re from Singapore, Indonesia, or Vietnam, or are willing to relocate here, you can apply via our Lever portal!

--

--