Retrying Kafka Streams messages with KV Stores
At Ninja Van, we use Kafka streams extensively; from using it to process order requests, to status changes. We encounter errors due to a variety of reasons, all of which require retrying Kafka streams messages in a timely manner.
In this post, I’ll describe some of the designs that we tried, tested (and failed at), before settling on a design using Kafka's key-value stores.
If at first you don’t succeed, retry
We started out by writing all messages that failed into a database table.
CREATE TABLE `retries` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`created_at` datetime DEFAULT NULL,
`deleted_at` datetime DEFAULT NULL,
`updated_at` datetime DEFAULT NULL,
`request_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`error` longtext COLLATE utf8mb4_unicode_ci,
`proto_request` longtext COLLATE utf8mb4_unicode_ci,
`processed` bit(1) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `order_create_dlq_errors_processed_index` (`processed`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
request_id
a UUID that ties back to the original request; useful for when attempting to check our logs for when / how the error occurred
proto_request
the JSON representation of the protobuf Kafka message. All our Kafka messages are published using protobufs.
processed
if the message has been successfully processed
The retrying of the Kafka messages was done on a topic specifically created for the service to ensure that a separate Kafka thread pool would be used.
This design held up for about 1 month before scaling issues started to show.
When an outage occurred, the service would have a hard time keeping up as only a single instance of the service could handle the re-publishing of the messages at any one time. When the database started returning over a million messages to retry, the service would get overwhelmed and slow to a crawl.
Back to the drawing board
With scale in mind, we started to look for new ways of handling retries. The new design had to meet the following goals
- Self contained within Kafka Streams
- Scalability
- Able to specify exponential backoff with jitter for when a message should be retried
- Not require any sort of service specific code (to allow us to use the same code in a library across different services)
- Data storage has to be fault tolerant
We eventually settled on KV state stores that Kafka provides for interactive queries.
Setting up the store
Kafka state stores provide an in-memory Hashmap that allows us to store key-value entries. There are many different implementations that can used to back the state store, we settled on the in-memory hash map to reduce the number of dependencies required.
This in-memory hash map would be backed by a Kafka topic, and would need to be tied to a Kafka processor.
StoreBuilder<KeyValueStore<String, byte[]>> retryBuilder =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("inmemory-order-create-retry"),
Serdes.String(),
Serdes.ByteArray());
topology.addStateStore(
retryOrderBuilder,
processor("inmemory-order-create-retry")
);
This will create an internal topic with the suffix -changelog
to allow the in-memory state store to be fault tolerant.
You can check this via the Kafka command kafka-topics
.
~ kafka-topics --zookeeper localhost:2181 --list | grep changelog
The changelog functions similarly to how a topic does; each partition in the changelog is assigned to a Kafka consumer. This will allow us to scale the consumption by scaling up the number of Kafka consumers when there’s a spike in retry messages.
Putting it all together
Next up, setting up the Kafka streams processor. This processor has to be the same processor that you associated with the state store in the section above.
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext ctx) {
super.init(context);
kvStore = (KeyValueStore) context.getStateStore("inmemory-order-create-retry");
ctx.schedule(100, PunctuationType.WALL_CLOCK_TIME, (ts) -> {
// Processing code goes here
});
}
This sets up the key-value store for use during the processing of the message. More on the context.schedule
later.
@Override
public void process(String key, byte[] value) {
OrderRequestProto.OrderRequest request;
try {
request = OrderRequest.parseFrom(value);
RequestInfo requestInfo = request.getRequestInfo();
// Add in exponential backoff + jitter here
String storeKey = String.valueOf(ZonedDateTime.now().toInstant().toEpochMilli()) + "-" + UUID.randomUUID().toString();
kvStore.put(storeKey, value);
} catch (Exception e) {
Log.error("Error", e);
}
}
Before we store the message into the state store, we’ll need to define the exponential backoff and jitter. We went with something along the lines of the following, but you can use anything that you want.
Math.exp(Math.log(((double) RETRY_PERIOD / INITIAL_DELAY) + 1) / RETRY_TIMES);
This will ensure that retries do not take place all at once; this doesn’t affect one of failures but during system wide outages, it’ll ensure that systems don't get overloaded when they recover.
Punctuate
Coming back to context.schedule
, this is where the code for processing messages to be retried will go. A basic skeleton would look like this.
ctx.schedule(100, PunctuationType.WALL_CLOCK_TIME, (ts) -> {
try {
KeyValueIterator<String, byte[]> iter = kvStore.all();
while (iter.hasNext()) {
KeyValue<String, byte[]> entry = iter.next();
// Any checks on when to process the message
// (e.g. due to backoff + jitter)
// should be done here
// You can do anything here, process the message etc
// For us, we're forwarding the message
context.forward(null, entry.value, 0);
kvStore.delete(entry.key);
}
iter.close();
context.commit();
} catch (Exception e) {
Log.error("Error", e);
}
});
}
The context.schedule
will run the code block based on 2 parameters
- Time in milliseconds
- Punctuation type
PunctuationType
denotes how the schedule run times are calculated. In the example, we’re using wall clock time; this means that every 100ms that passes, the code block will be executed. More information about other punctuation types can be found here.
Downsides
While using the Kafka state stores meet all our requirements, there are a few downsides out of the box.
- No easy way to switch off the Kafka state store if we need to. An example would be when you’ll want to redirect all failed messages into a DLQ, rather than attempting to retry them.
- Changelog topics cannot be monitored by default jmx metrics (topic lag will always be at 0)
- Increasing number of partitions on the changelog is complicated
We eventually added a way to drain the Kafka state store data into a DLQ, and added custom jmx metrics to monitor the changelog topic on Grafana.
As for partition management on the changelog topic, it required a multi-step process of draining the changelog, re-creating a new changelog and pointing the consumer to it. Do-able, but complicated to do in a production environment.
Wrap Up
We’ve rolled this out across multiple services, and even built a dedicated retry service based off this implementation.
We hope this has helped you with implementing your own retry implementation via Kafka streams.
p.s Ninja Van is hiring! Interested in working on interesting side projects like these? If you’re from Singapore, Indonesia, or Vietnam, or are willing to relocate here, you can apply via our Lever portal!