Taking control of your Poison Pill
A poison pill in Kafka, is a message produced to a Kafka topic, which always fails to be consumed no matter how many times the system tries.
I am currently working on a system where clients publish customer related events to over 30 different Kafka Topics. A SpringBoot application, running on a Kubernetes cluster, reads those events, processes them and saves them to an ElasticSearch database. The system processes an average of 50 million events in a given day.
Each event has a corresponding Avro schema defined in a schema registry. When a message is read, from a Kafka queue, it is first deserialized using the specified schema obtained from the schema registry. If the application temporarily loses connection from the schema registry, the events fail to deserialize. In these cases, the system automatically sends the messages to the dead letter queue. My task was to instruct the system to retry deserializing the messages until the connection with the schema registry could be re-established.
Default Deserializer
When a message is sent to Kafka it goes through two stages:
First, Kafka tries to deserialize the message and using the deserialized values produces a consumer record.
Second, a Kafka consumer picks up the consumer record and begins to process it.
In a SpringBoot project you can specify the deserializer to use by setting the following property:
spring.cloud.stream.kafka.bindings.process-in-0.consumer.configuration.value.deserializer
=org.apache.kafka.common.serialization.StringSerializer
When a project uses a basic deserializer, like StringSerializer, if the first step fails, Kafka will throw an exception and get stuck in an endless loop trying to deserialize the same message. New messages will begin to build up behind this poison message and your logs will start filling with Kakfa warnings.
ErrorHandling Deserializer
To implement the ErrorHandlingDeserializer simply change the property to:
spring.cloud.stream.kafka.bindings.process-in-0.consumer.configuration.value.deserializer
=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
When the ErrorHandlingDeserializer is implemented, a message that fails to deserialize, will not cause an exception to be thrown. Kafka will instead, create a consumer record where the header is set to “springDeserializerException” and the value of the message is set to null. Kafka will pass this custom message onto the Kafka consumer and move on to the next message in the queue.
When the Kafka consumer reads a new message, it first checks the headers associated with the message. If the “springDeserializerException” header is found the message is immediately sent to the dead letter queue and the consumer moves on to the next message. This process gives the developer the ability to investigate the faulty message without negatively impacting the entire system
FailedDeserializationFunction
So far we have addressed the two extremes. The default deserializer will retry a message until it can deserialize it successfully. The error handling deserializer will send the message to the dead letter queue after the first deserialization failure. There are cases, where a system would want to retry specific deserialization errors and not automatically move past them. For example, when trying to connect to a schema registry during deserialization the system might receive a java.net.UnknownHostException. In this case, the system might have temporarily lost connection to the schema registry and needs a couple of seconds to recover the connection. It would then be appropriate to retry deserializing the message while the system is trying to reconnect.
In the class ErrorHandlingDeserializer, after a message fails to deserialize, and the “springDeserializerException” header is added to the message, a function named recoverFromSupplier is called.
private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
return this.failedDeserializationFunction.apply(failedDeserializationInfo);
}
else {
return null;
}
}
In the recoverFromSupplier() function if a failedDeserializationFunction is defined, then the system will run the custom logic supplied in that function. Otherwise a null value is returned and Kafka will move onto the next message.
To define a failedDeserializationFunction, first add the following properties to the properties file:
spring.kafka.consumer.properties.spring.deserializer.key.function=path_to_a_defined_function
spring.kafka.consumer.properties.spring.deserializer.value.function=path_to_a_defined_function
The same or different functions can be assigned for deserializing the message key and message value.
Second, create a functional interface that implements Function<FailedDeserializationInfo, Object> and override the apply() function.
public class FailedDeserializationProvider implements Function<FailedDeserializationInfo, Object>{
@Override
public String apply(FailedDeserializationInfo info) {
//If the reason the message failed to deserialize was NOT
//caused by a Serialization exception then throw an exception
//so it can be retried
if (info.getException().getCause() != null &&
!(info.getException().getCause() instanceof
SerializationException)){
//Log the error
log.info(Logger.m("failed deserializing retry",
Logger.f("topic", info.getTopic()),
Logger.f("exception",
info.getException().getCause().toString()),
Logger.f("isForKey", info.isForKey())));
//Remove the header springDeserializerExceptionValue
info.getHeaders().remove("springDeserializerExceptionValue");
try{
Thread.sleep(1000);
} catch(Exception e){
e.printStackTrace();
}
//Throw a serializationException forcing Kafka to
//retry deserializing the message
throw new SerializationException("Triggered
FailedDeserializationProvider - retry until connection
issue resolves", info.getException());
//If the message failed to deserialize because of a
//Serialization exception then return null so it can be
//skipped
} else {
//Log the error
log.info(Logger.m("failed deserializing send to dead
letter queue",
Logger.f("topic", info.getTopic()),
Logger.f("exception",
info.getException().getMessage()),
Logger.f("isForKey", info.isForKey())));
return null;
}
}
}
In our case, when a specific type of deserialization exception is received, the system throws its own exception forcing Kafka into a retry loop.
Turning On and Off Custom Logic
A poison pill is a serious threat to any production system. In the event that the custom retry logic is not working correctly, a mechanism should be in place to turn off the custom retry logic and seek past the poison pill. Unfortunately, it is not as simple as setting the property, spring.kafka.consumer.properties.spring.deserializer.value.function, to an environment variable that can be easily removed, and redeploying the application. If the environment variable exists, the property, will be set to the failedDeserializationFunction’s class path and if the environment variable does not exist then the class path will be blank, indicating that there is no custom retry logic.
spring.kafka.consumer.properties.spring.deserializer.value.function=${FUNCTION_PATH:}
The limitation arises in the configuration process of the ErrorHandlingDeserializer. During the configuration process the setupFunction(Map<String, ?> configs, String configKey) checks to see if the property spring.kafka.consumer.properties.spring.deserializer.value.function exists in the properties file. If the property does exist, the system automatically tries to create an instance of the Class specified in this property. The code responsible for the class creation is ClassUtils.forName((String) value, null). The first parameter, value, represents the name of the class that is to be created. If the code cannot properly create an instance of the class it throws a ClassNotFoundException and the application will not load.
private void setupFunction(Map<String, ?> configs, String configKey) {
if (configs.containsKey(configKey)) {
try {
Object value = configs.get(configKey);
Class<?> clazz = value instanceof Class ? (Class<?>) value : ClassUtils.forName((String) value, null);
Assert.isTrue(Function.class.isAssignableFrom(clazz), "'function' must be a 'Function ', not a "
+ clazz.getName());
this.failedDeserializationFunction = (Function<FailedDeserializationInfo, T>)
clazz.getDeclaredConstructor().newInstance();
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
The only way to turn off the custom retry logic and properly start the application is to make sure that the property spring.kafka.consumer.properties.spring.deserializer.value.function does not exist at all.
RETRY_UPON_DESERIALIZATION_ERRORS FLAG
In order to control the presence of the property, spring.kafka.consumer.properties.spring.deserializer.value.function, in the configuration file, an additional environment variable is added to the system’s config file. The new variable, RETRY_UPON_DESERIALIZATION_ERRORS, is either set too true or false.
RETRY_UPON_DESERIALIZATION_ERRORS: "true"
Next a new property is added to the system’s properties file that references the environment variable.
spring.kafka.consumer.properties.spring.deserializer.retryUponDeserializationErrors=${RETRY_UPON_DESERIALIZATION_ERRORS:false}
Lastly, a custom ErrorHandlingDeserializer class is defined that extends the default class and overrides the configure function.
public class CdpErrorHandlingDeserializer<T> extends ErrorHandlingDeserializer<T> {
public static final String KEY_RETRY_UPON_DESERIALIZATION_ERRORS = "spring.deserializer.retryUponDeserializationErrors";
public static final String VALUE_DESERIALIZER_FUNCTION = "spring.deserializer.value.function";
public static final String KEY_DESERIALIZER_FUNCTION = "spring.deserializer.key.function";
@Override
public void configure(Map<String, ? extends Object> configs, boolean isKey) {
if (Boolean.parseBoolean((String)configs.get(KEY_RETRY_UPON_DESERIALIZATION_ERRORS))) {
super.configure(configs, isKey);
} else {
configs.remove(VALUE_DESERIALIZER_FUNCTION);
configs.remove(KEY_DESERIALIZER_FUNCTION);
super.configure(configs, isKey);
}
}
}
If the value of KEY_RETRY_UPON_DESERIALIZATION_ERRORS is true then the default configure function is called. If it is set to false the properties for defining the deserialization function will be removed from the config file before the default configure() function is called. The configure() function is what ultimately calls the setupFunction(Map<String, ?> configs, String configKey) described earlier.
Conclusion
It is true that one nefarious message can wreck havoc on a system but that does not mean that all messages that fail deserialization on the first try should automatically be sent to the dead letter queue. With the ErrorHandlingDeserializer and the failedDeserializiationFunction, Kafka and Spring gives programmers the ability to design error handing logic that best suits their system’s needs.
About me
I am a Senior Backend Engineer at Tikal. I believe that being a good programmer means you never stop learning.