Spring Boot DeserializationException handling in Kafka

Mateusz
2 min readMay 27, 2019

--

Preventing Kafka records which aren’t valid need to be very important in order to achieve consumer high throughput. Unfortunate configuration of Kafka consumer combined with lack of proper error handling can cause a situation when our consumer will try to consume the same invalid record in a loop.

First step — let’s provide some error handler

Spring Kafka offers ErrorHandler interface, which instance can be passed to KafkaListenerContainerFactory. After that our implementation of ErrorHandler will be responsible for handling each exception occurred during records consumption. One of the more precise extensions of ErrorHandler is ContainerAwareErrorHandler interface. Implementing this interface gives us the ability to catch all records, even those which are not processed.

As you see in the code above, all we need to do is an implementation of handle() method. This handler will catch all exceptions, so we need to check the exception type before manual committing new offset. Of course, exception logging is good practice. Let’s look at our configuration…

But when you will try to run code with your error handler bound with the container you will still be in an invalid record consumption loop. Why? Cause List<ConsumerRecord<?,?> records argument is empty. We need to do one more step.

Second step — let’s adjust our deserialization configuration

Since Spring Kafka 2.2 there is a class which will be our last piece of the puzzle — ErrorHandlingDeserializer2<T>. It’s like a wrapper on our deserializer, it’ll try to deserialize a record, but when the exception will occur it’ll create ConsumerRecord with the key in the header (“springDeserializerExceptionKey” or “springDeserializerExceptionValue”, look at static Strings in KafkaErrorHandler snippet) and raw byte array as a value. This record will be present in handle() method as an argument in records list. Now configuration should look like this:

After that, your invalid record will be skipped and the consumer won’t be trying to handle it one more time.

Thank you for reading :)

Mat

Edit:

On my Github account you’ll find a simple example of Kafka producer and consumer — https://github.com/codegard/kafka-docker, feel free to send me your questions or ideas ;)

--

--