Introducing Hot and Cold Retries on Apache Kafka®

An overview of how to build a fault-tolerant event delivery system by using non-blocking retries of Apache Kafka® in Udemy Payments Team

Berat Cankar
Udemy Tech Blog

--

With more than 46M students and 644M course enrollments on the Udemy platform, each day many users go through the checkout process to gain access to content. This generates a significant amount of traffic, and in tandem, this leads to many payment transactions. As for the integration approach, we complete the purchases by forwarding them to third-party payment processing vendors such as Stripe or Adyen. After receiving a successful response from those vendors, we deliver the courses to the students. This is a short summary of the user experience from the checkout point of view. On the other side of the coin, we listen to the vendor notifications to follow the state changes of these transactions and get informed about their updates. We need to confirm the following items about any notifications (aka webhooks):

  • Receive the notifications from the vendors
  • Process them and run necessary business logic in the receiver service
  • Transform them into vendor-agnostic messages in the Udemy ecosystem
  • Send them into downstream systems without missing or losing any of them

Given the aforementioned requirements and challenges, we choose to use Apache Kafka® for sending our messages into downstream systems. Kafka provides a high-throughput, scalable, and highly available messaging platform. Yet, it is still nearly impossible to foresee any future incidents. We need to guarantee that the vendor-originating webhooks will reach the downstream system. And that they will be able to run their respective business logic in a fault-tolerant manner. Your Kafka consumers can fail because of an exception in your code or an incident that can last for hours. The best you can do is to take the necessary measures to prevent such cases and make your system self-recover itself. In order to tackle these challenges, we have come up with an efficient solution by adding retrial logic to our consumers.

In this post, I am going to explain how you can introduce such hot and cold retries in your event delivery ecosystem in a non-blocking way. On top of that, I will also describe how you can tweak it for different configurations depending on specific use cases in Kotlin with the Spring framework.

Some Terminology

Let’s begin the discussion by defining some terms. We are going to refer to them a lot in the upcoming sections and it would be nice to clarify them beforehand.

  1. Hot Retry: It is a strategy of retrying the message right after you get an error while consuming it. As an example, you can think of a scenario where your consumer cannot connect to your data store due to a socket timeout. Or, you performed an API call to fetch some resources. But, the resource is missing, and you only need some extra seconds to refer to that resource. This can be a common cause for most of the eventually consistent distributed systems. As you can see, your consumers can recover from these types of errors thanks to immediate retrials.
  2. Cold Retry: This type of retry strategy refers to a way where you need some time (possibly more than a few seconds) to get your underlying cause resolved. As an example, let’s imagine you have a high replication delay in your MySQL cluster. Your consumer cannot get the necessary records from the replicas. As a result, your consumers will fail until the replication is completed. In such cases, you will need a cold retry mechanism and delay your topic somehow to retry it in the resolution moment.

Now, we are ready to proceed to the next steps. First I am going to define what we try to achieve in the end with a high-level flow diagram. After that, we will go through the configuration schema, usages of these configuration parameters, topic forwarding strategies for cold retrials, and a sample consumer implementation.

Flow

As depicted in the below figure, our journey starts with our producer. It publishes the message on the target topic, and the consumer starts processing. There will not be any problem while consuming in the happy path, and the consumer will be able to process the message.

But, if the consumer encounters an exception, it will first check whether any hot retry is defined or not. If so, we will try the message consumption up until maximum hot retry attempt or it succeeds in consuming the message. If it fails and reaches the maximum threshold, then it checks whether the cold retry configuration is defined or not. In the worst-case scenario where we do not have any cold retry configured, the message goes to the Dead Letter Queue (DLQ). Otherwise, we will forward the message to the cold retry topic, and make it be processed with some delay. That is why these kinds of topics are called delayed topics.

Note that the delayed topics do not differ from the normal topics in terms of topology but in terminology. You will see that they could also have a hot retry defined in them or even another cold retry topic chained next to another delay topic.

Now, we have forwarded the message to our cold retry topic. One of the main aims here is to chain delay topics with themselves or other ones without blocking the other messages in the same partition. To do that, we will implement the topic forwarding logic for cold retries. Hence, a failure on a cold retry topic will lead to publishing the same message into the same topic again and skipping the current offset. Also, to have a retry threshold, we put custom headers on the topics and check the attempt value in the next consumption. By doing so, the current offset’s message will not block the next offset, and the current failed message will get its new offset at the end of the same partition after incrementing the attempt count (Yay!). Still, it is possible that we can reach the maximum attempt count limit on the cold retry. As a result, this will cause either publishing the message to the next cold retry topic if defined or the dead letter queue as a sink point.

As you can see from the overall flow, the consumers will handle each forwarded message as a fresh message and apply the hot and cold retrials on them again and again in case of failure.

Configuration Schema

As shown in the below code block, I choose to structure the configuration to support both hot and cold retries. In the topology, we have the topics section to refer to the configuration of each topic. Here, each topic has its qualifier (which will be used for reference in the codebase) and configuration details.

  • Topic name: Topic name to send messages via Kafka template
  • Consumer count: Consumer count to decide the number of concurrent consumers
  • Hot retry
    – Count: Number of attempts before proceeding with the cold retry topic
    – Interval: Duration gap between each hot retrial in milliseconds
  • Cold retry
    – Count: Number of attempts before proceeding with the next cold retry topic
    – Interval: Duration gap between each cold retrial of the same topic in milliseconds
    – Next topic: Topic qualifier to be forwarded next if any exists
  • Manually acknowledgment flag: Boolean flag to decide whether the message needs manual acknowledgment or not

You can also see how I read the configuration in the code along with a bit more detailed explanation for the corresponding fields:

Consumer Factory

Now, it is time to define the consumer factory. We are going to use ConcurrentKafkaListenerContainerFactory and configure it with the above parameters. Using their qualifiers, I defined a common method to revisit and create different beans for each topic. At this point, we have an important point to pay attention to: We are going to configure the hot retry logic in the factory creation moment. As you can see, I set the error handler on the factory with SeekToCurrentErrorHandler. This handler expects a dead letter publishing recoverer and some back-off strategy. We embed the hot retry logic by using the fixed back-off logic and make the consumers operate the immediate retrials from the configuration above.

Another critical point that we need to keep an eye on is the acknowledgment mode setting of the factory. We need to control the acknowledgment mode as we try to enable non-blocking delayed cold retrials, we need control over the acknowledgment mode. For the cold retriable topics, I set the acknowledgment mode as MANUAL_IMMEDIATE. I will give more details about its usage while exemplifying the sample consumer. Now, let’s move on to the DeadLetterPublishingRecoverer implementation. Here, we want to get a strategy to forward a message on the cold retry procedure. I will explain the details of how you can create such a factory and corresponding forwarding strategies in the next section. The only remaining item in the recoverer is that we forward the message to the topic that the strategy suggests on the same partition. You can change the partition decision logic by your use cases.

Topic Forwarding

Finally, we come to the part where all the flow explained in the first figure embodies itself in the codebase. First, we are going to define strategy decision logic in the DeadLetterPublishingRecoverer. Here, I will list what is done in the below strategy factory implementation line by line.

  1. If a deserialization exception occurs, there is no need to retry the message again. We can directly forward it into the dead letter topic in L19-L22.
  2. We do a safety check whether the current topic exists in the configuration and whether it has a cold retry. If not, we can forward it into a dead letter topic again in L24-L28.
  3. We have a simple statement to fetch how many times we attempt to process the topic from a custom header. If it is not in the header, then we can assume that it was the first trial in L30-L33.
  4. We need to check whether the attempt limit reaches the maximum attempt count of that cold retry. If not reached yet, then we will publish the message on the same topic in L35-L42.
  5. If it reaches the maximum limit, then we will forward the message to the next topic in L43-L47.

As you can see from the strategy factory, we have three types of topic forwarding strategies implemented. All these strategies have a common interface named TopicForwardingStrategy with a single method returning the forwarded topic name.

You will see that we can modify any header value in the strategy implementation of the interface method. For example, we will use a header field to keep a count of the retrials. When the current attempt reaches the limit defined for that cold retry, then we will forward the message to the next topic. Also, we will insert information about when the failed message should be processed by our consumers. As we aim to delay the failed message processing for some time, we need to suspend the processing of these messages until their next processing time. You will see that we also compute and put the next processing time information into the message headers and check it inside our consumers. We will dive into its details in the sample consumer section later. For now, let’s move on to the implementation of each strategy.

  • ColdRetrySameTopicForwardingStrategy: This strategy performs the forwarding of the failed message into the same topic if we do not exceed the retry limit on that cold retry configuration. We increment the current attempt number in the strategy method and set the next processing time. Since it is the same topic forwarding, we delegate the current topic name as a result.
  • ColdRetryNextTopicForwardingStrategy: This strategy handles the redirection of the failed message to the next cold retry topic. In this context, we need a fresh start of retrials, so we reset the attempt count. We also set the next processing time, and return the name of the next retry topic.
  • DeadLetterTopicForwardingStrategy: This strategy manages to forward the failed messages to the sink point named Dead Letter Queue. It only returns the name of what is given as the DLQ name.

Up to this point, we have implemented the hot retry logic in the consumer factory and configured our DeadLetterPublishingRecoverer to support the different kinds of cold retry topics to be chained one after the other. The forwarding mechanism will utilize the current attempt count to determine the next topic. As we re-publish the failed message to our queues, we will not cause any blocking to other messages in the queue. The only missing part in the puzzle is how we will schedule delayed topics.

Sample Consumer

We have reached the final point of our journey. In this section, we address how to suspend the failed messages until their next processing time. As you may remember, we embed the next processing time information in the strategies. Now it is time to use this information. In the Consumer Factory section, we set the acknowledgment mode of the consumer, enabling us to pass the Acknowledgment parameter to our consumer methods. We use this parameter to suspend the message consumption until their processing time. As you can see from the below code segment, we directly return from the method if we do not reach the processing time. There are two key points we need to pay attention to:

  1. If the next processing time of the current offset is ahead from now, then we should call acknowledgment.nack(). By doing so, we will force our consumer to re-seek the same offset again and continue so until we reach the next processing time. Here, you may want to determine how many milliseconds you want to suspend your consumer
  2. If we reach the next processing time, then we should execute our business logic. But we should call the acknowledgment.acknowledge() method in any case. We should acknowledge both success and failure scenarios. That is why I cover the business logic in the try-finally block. In this context, the acknowledgment will have a meaning that we try to process the current message, and it completes its flow. This statement will make our consumer commit that offset and continue with the successive records in the same partition.

Conclusion

I hope you enjoyed our explanation of how you can introduce hot and cold retries on Apache Kafka® to build a fault-tolerant event delivery system. It was a journey that included preparing a baseline to extend and configure different types of retries for each topic. Examining how you can utilize immediate retries in the consumer factory. And last but not least, we also implemented several topic forwarding strategies to support cold retries in a non-blocking manner. Thank you for reading, and I hope that you find this useful for your own use cases and requirements. To find a more concrete application, kindly check out my repository.

Acknowledgment

This huge effort would not have been possible without the significant contributions of my teammates. I would like to thank everyone who has supported us through our journey, who reviewed and assisted in improving this article.

--

--

Berat Cankar
Udemy Tech Blog

Software engineer @toolio, curious learner, over-thinker, cat lover, sudoku solver, your smiling neighbor https://github.com/yakuza8