Demystifying Spring Kafka

Garg Vatsal
The Startup
Published in
10 min readNov 2, 2020

As and when you will start migrating your products/applications to event driven architectures, the first immediate thought would be “Use Kafka!”. Kafka has really matured over time and provides ton of functionalities out of the box to realise a lot of use-cases. However, the most common one being pub-sub model.

Whats Pub-Sub model?

Its mostly that you will be pushing messages/updates on a topic and there will be consumers subscribing to it to get those updates. However, as we know, it’s not going to be conventional push from broker but a pull from consumer.

Pub-Sub Model

Usual challenges in such implementations

  • Payload serialisation strategy to use. (Avro/proto-buf/Thrift/gRPC/etc)
  • If you are using a serialisation technique, decide on how to share schemas (i.e schema-registry)
  • How would i filter the messages before the de-serialisation happens.
  • Retrying messages which we have failed to deliver.
  • Error-Handling (If we delivered the message, but the processing failed)
  • How to handle transient vs non-transient errors.
  • What will be my recovery strategy from failures.
  • Since, we have some limitations on Exactly-Once delivery schematics, they wont be ideal for every use-case, so, how can we alternately try to de-duplicate the messages at the consumer end, and drop the redundant messages.

Payload schema And registry

While choosing the serialisation technique, following should be the consideration criteria.

  • payload size
  • Binary payload compression
  • Backward/forward compatibility and extensibility.
  • serialisation and deserialisation cost.

Apart from this, it’s important to rationalise a schema registry. The presence of registry off-loads the over-heads of making compatibility checks and sharing the schemas manually. Registry can maintain multiple versions and can co-work will different versions deployed based on the compatibility level configured selected.

For all our further conversation we will assume that we are working with Avro.

How confluent embedded schema registry in Vanilla Kafka

So, vanilla Kafka doesn’t hold a concept of schema-registry. All confluent does is, exploit the Kafka configurations. In Kafka consumer and producer configuration, you can mention the serialiser/deserialiser class to use for the payload.

Confluent, provides KafkaAvroSerialiser and KafkaAvroDeserialiser classes for avro based SerDe. Also, confluent provides schema-registry, and exposes a CachedSchemaRegistry url.

The SerDe classes have binding code towards confluent schema-registry, thus vanilla Kafka doesn’t hold any knowledge on the registry by-default and is completely decoupled from this concept.

With respect to the schema-registry, there are few terminologies.

  1. subject — refers to the topic name in Kafka. (default strategy, for more info, look for subject naming strategies)
  2. id — unique id is allocated to each schema stored in registry.
  3. version — each schema can have multiple versions associated to it per subject.

Each request to serialise the data is handed over to the cached-registry-client to map it to an ID based on subject name, which is appended to the front of the messagePayload. On the consumer side, the deserialiser class queries the registry again with the id, and the subject, to get hold of the schema to use for deserialisation.

So, configure your consumer and producers with confluent SerDe classes and add on the registry url to the configuration.

schema.registry.url: http://www.myResistry.com
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer # Consumer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer # Producer

Good, so we know some details about how to handle AVRO based SerDe.

Filtering of messages

There can be situations where ordering of messages that you produce are of utmost importance. You might not want to choose to produce different events on different topics. You might still choose to publish different message payloads adhering to different schemas on a single topic. (Fair Enough! YOLO)

But, the complications comes on the consumer side, we might want to listen to particular events only and might not be interested in listening to few events at all.

So, what do i do. Well, Spring-Kafka provides a way to filter out those messages using RecordFilteringStrategy. But, catch is that is post deserialisation filtering, which means, consumers might have to take the hit-of spending extra time and resources in doing this deserialisation.

Thanks to Confluent, there is an easy escape.

You can override the default kafkaAvroDeserialiser class and get hold of the bytes published and headers, and use those headers to check if the message has to be dropped or not.

Which means, that be careful while designing your message payloads and headers, not all the information might go in the payload, your filter identifiers must go in the headers.

public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {// Custom interface PreDeserilizationFilter with filter method.
public
static List<PreDeserilizationFilter> filters;
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaAvroDeserializer.class);public Object deserialize(String topic, Headers headers, byte[] data) {if (filters != null) {
for (PreDeserilizationFilter filter : filters) {
if (filter.filter(topic, headers, data)) {
return null;
}
}
}
return
deserialize(topic, data);
}
}

Phew! Thats a relief. And we are done.

Retry, Error-Handling And Recovery

Retry is when we have failed to deliver the messages to the consumer because of some reason, and we want to guarantee that message is not lost and must once-at-least be processed.

Error-handling is when the message was delivered to the consumer but we failed to process it for some reason.

Recovery is the most critical of all, you might end up into infinite loops if this is not correctly configured.

If all of these are not properly configured or used on the your consumer code, chances are you will at some point land up into big trouble.

First, thing first, is it possible to end into infinite retries on the consumer side. Totally! Yes!

Just to capture one common scenario.

  • Your message is failing to be successfully consumed, and you are manually going an ACK on the message for each successful consumption, each poll will re-deliver the same message to you from the head of the partition and you might end up in an infinite loop choking the whole pipeline processing. (Easy! right!)

How to handle this!

So, Spring Kafka has given a fix around this, each consumer maintains a thread-local cache of such failures based on TPO(topic-partition-offset) or the message. It increments the thread-local cache every-time the same TPO is encountered, and does a seek to this off-set so that it can be retried from the head of the queue. However, once you have expired all the attempts (default-10), it basically, auto-acks the message for you.

But, this is disastrous!

How, can Spring Kafka acknowledge that message on its own! Well, this is a recent fix and if you don’t know about this, then you for sure, don’t know about setting ErrorHandlers in Spring Kafka.

Check, out SeekToCurrentErrorHandler from SpringKafka!

So, basically, if you are using an error handler, your recoverer would be invoked before Spring-Kafka does the auto repair of your wounded Kafka-consumer/topic-partition.

What can i do in the recoverer?

So, recoverer is the code, where if your message is not processed, you can trigger some intimation or so some final critical processing for this message to let the system know, that we have skipped something important which can be dealt with later or raise a critical alert! (SOS-call)

In most such situations, people raise an alert (depending on whats the purpose of the data you are processing and what criticality it holds for you to skip the message) and also push this un-processed message to a dead-letter queue, from which you can take a peek into this message off-line and do some manual intervention.

Record based Listener processing.
For batch-record processing.

What about Non-Transient errors?

Don’t try to retry non-transient errors. A Deserialisation problem will never stop to re-occur if it happens for a payload. These errors are not auto-recoverable. So, be cautious and not land-up in a place where you end up retrying non-transient errors in a loop!

Push them directly to a DeadLetter queue. (Done-and-Dusted, for later)

How Spring Kafka does retries?

To effectively understand how Spring Kafka does retries you need to understand a little, how Spring Kafka works in the back ground. I am going to touch the over-view of it briefly to get an idea.

Spring Kafka follows a Layered-approach of designing listeners. Listeners are the root blocks of code, which push the actual message payload to the methods annotated in your code with @KafkaTemplate annotations.

Spring-Kafka Retries

@kafkaTemplate and @KafkaHandler are the two most utilised, yet not quite well understood annotations in SpringKafka

SpringKafka scans the beans for methods annotated with these annotations, and keeps the metadata of what you have asked SpringKafka to inject into your method at runTime.

The whole process is triggered when you enable SpringKafka by using annotation @EnableKafka

@EnableKafka
@KafkaListener(topics = "test", groupId = "hello123")
@Component
public class TestConsumer {
@KafkaHandler
public void listen(A record, Acknowledgment ack) {
try {
System.out.println(record);
ack.acknowledge();
}catch(Exception e) {
ack.acknowledge();
}
}
@KafkaHandler
public void listen1(B record, Acknowledgment ack) throws Exception {
try {
System.out.println(record);
ack.acknowledge();
}catch(Exception e) {
ack.acknowledge();
}
}

@KafkaHandler(isDefault = true)
public void listen9(Object record, Acknowledgment ack) throws Exception {
try {
System.out.println(record);
ack.acknowledge();
}catch(Exception e) {
throw e; //it is important that you throw the exception back for Spring Kafka to retry
}
}
}

//---------------***** OR *****------------
@EnableKafka
@Component
public class TestConsumer {
@KafkaListener(topics = "test", groupId = "hello123")
public void listen(ConsumerRecord<A> record, Acknowledgment ack) throws Exception {
try {
System.out.println(record);
ack.acknowledge();
}catch(Exception e) {
throw e; //it is important that you throw the exception back for Spring Kafka to retry
}
}

So, as shown above the RecordListener is the one which invokes your method with ConsumerRecord<T>, Acknowledgement, Headers, etc.

But, this Listener gets wrapped in a RetryingListener, incase an exception is thrown on processing of your record the RetryingListener redelivers the same message to your method again a specified number of times.
If, you are still unable to handle or process those errors, the SeekToCurrentErrorHandler comes into play and re-delivers the same message to the RetryingListener by re-winding the Topic partition of the failed offset and attempt re-delivery.

In case, you exhaust those retries, the head is removed/skipped/auto-ack’ed and your recoverer is invoked.

So, where is the catch!

The catch is that you might end-up processing the message a lot of times, if you haven’t configured the retry values ideally.

Lets consider below values 
Retry handler threshold - 3
SeekToCurrentErrorHandlerRetry - 3
Total invokes to the method - 9

So, carefully plan the retry thresholds to save on exhaustive retries.

Recovery Strategy

You can opt for multiple strategies based on requirements and use-cases to recover from the failures. In case, you want in-order processing guarantees, it’s not an easy problem to solve. You can’t stop the processing of messages if one of the message is not consumed properly/raises error.

  • Based on Recoverer in the ErrorHandler — If ordering is an issue, you can pass on the message to dead-letter and mark some state, based on the header information. The consumer can later refer this information for subsequent messages, and forward any subsequent messages/updates for the failed transaction to a dead-letter queue by default and take them off from the main processing queue.
  • If ordering is not an issue, just start pushing the failed messages to the dead-letter queue.

Deduplication of messages

Again, this is not an easy problem to solve. There can be many situations that can give rise to duplication situation.

  • Consider, you have multiple Consumer Groups processing events from same topic. Few of them successfully processed an event however, the others fail to process an event. How to selectively retry the processing on the failed consumers only. For this problem, we will use the mix of earlier discussed solutions.
    1. We will use header based pre-deserialisation filters, and
    2. SeekToCurrentErrorHandler.
    Add a “Retry” key to header with the consumer-name mapped to it, to later assert and process the message only if pre-deserialisation filter confirms its a retry message and the consumer-name equals to the name of the current consumer.
  • There could be a chance that your consumer successfully processed the message and before it could commit the message, the consumer failed. This will eventually lead to your message getting re-delivered on restart. Solution has not been tried or tested. It’s a hypothesis.
    You can use scalable bloom filters for this. You can use pre-deserialisation filters again to identify if the message ID(a key that can be part of header) was previously consumed or not. Bloom filter can be used to assert this, if the bloom filter says false, its confirmed that message was not processed, however, if the BloomFilter says true, it could be a false-positive, for this a persistence store based assertion can be done, if the message has been consumed earlier or not.
    However, for this work, you will have to maintain two states, one as soon as the message processing starts, which tells message was received but not processed, and another after the processing has been done, which confirms the message was processed. Make sure, you commit your processing and the after-processing-state ad part of a transaction in the persistence store for this to work.
    Also, scalable bloom filters brings added cost, resource and troubles. You might have to tune the False-Positive percentages.

--

--