Reprocessing of Kafka events

Pavel Likin
IT’s Tinkoff
Published in
7 min readMay 12, 2020

Today’s applications run in very complex environments. Business logic, wrapped in a modern technology stack, running in a Docker image that is controlled by an orchestra like Kubernetes or OpenShift and communicating with other applications or enterprise solutions through a chain of physical and virtual routers…

Something can always crush in this environment. So, event re-processing in case one of the external systems is unavailable is an important part of our business processes.

Before Kafka

Earlier in the project we used IBM MQ for asynchronous message delivery. If any error occurred during the service operation, the received message could be placed in dead-letter-queue (DLQ) for further manual processing. The DLQ was created next to the incoming queue, the message moving was done inside IBM MQ.

If the error was temporary and we could determine it (e.g. ResourceAccessException for an HTTP call or MongoTimeoutException for a MongoDb request), a callback strategy would take effect. Regardless of the branching of the application logic, the original message was moved either to the system queue for delayed sending or to a separate application that was once made to resend messages. In this case, the resend number is written to the message header, which is linked to the delay interval or to the end of the strategy at the application level. If we have reached the end of the strategy, but the external system is still not available, the message will be placed in the DLQ for manual analysis.

Search for a solution

On the Internet you can find the solution. Long story short, it is proposed to create a topic for each delay interval and implement on the application side Consumers, which will read the messages with the necessary delay.

Despite a lot of positive feedback, it does not seem quite good for our team. First of all because in addition to the implementation of business requirements, we will have to spend a lot of time on the implementation of the described mechanism.

In addition, if the Kafka-cluster includes access control, you will have to spend some time creating the topics and giving access to them. In addition to this, you will need to select the correct parameter retention.ms for each of the related topics, so that messages could be re-sent and not disappear. The implementation and access request must be repeated for each existing or new service.

Let’s see what mechanisms are available to re-process the message in general and spring-kafka in particular. Spring-kafka has a transit dependency on spring-retry, which provides abstractions for managing different BackOffPolicy. It is quite flexible, but its significant disadvantage is that it stores messages for resending in the application memory. This means that restarting the application due to an update or error during operation will result in the loss of all messages waiting to be processed again. Since this point is critical to our system, we have not considered it further.

The spring-kafka itself provides several implementations of ContainerAwareErrorHandler, such as SeekToCurrentErrorHandler, with which you can process the message later without shifting the offset in case of an error. Since spring-kafka 2.3 it is now possible to specify BackOffPolicy.

This approach allows re-processed messages to experience application restart, but the DLQ mechanism is still missing. We chose this option at the beginning of 2019, optimistically believing that DLQ will not be needed (we were lucky and really didn’t need it for several months of application’s operation with such a reprocessing system). Temporary errors triggered SeekToCurrentErrorHandler. The remaining errors were logged, causing offset commit, and processing continued with the following message.

The final solution

The implementation based on SeekToCurrentErrorHandler prompted us to develop our own mechanism for resending messages.

First of all, we wanted to use the experience we already had and expand it depending on the application logic. For an application with linear logic, it would be best to stop reading new messages within a short period of time specified by the callback strategy. For the rest of the applications, we would like to have a single point to ensure that the re-processing strategy is implemented. In addition, this single point should have DLQ functionality for both approaches.

The re-processing strategy itself should be stored in the application, which is responsible for getting the next interval when a time error occurs.

Consumers stop for a linear logic application

When working with spring-kafka, the code for stopping Consumer may look like this:

In the example, retryAt is the time to restart MessageListenerContainer if it still works. The restart will occur in a separate thread running in TaskScheduler, the implementation of which also spring provides.

We find the value of retryAt in the following way:

  1. We are looking for the value of the retry counter.
  2. According to the counter value, the current delay interval in the re-processing strategy is searched. The strategy is announced in the application in JSON format.
  3. The interval found in the JSON array contains the number of seconds after which we will need to repeat the processing. This number of seconds is added to the current time, forming a value for retryAt.
  4. If no interval is found, the value of retryAt is null and the message is sent to DLQ for manual processing.

With this approach, it only remains to store the number of repeated calls for each message that is currently being processed, for example in the application memory. Saving the attempt counter in memory is not critical for this approach since an application with linear logic cannot perform processing as a whole. In contrast to spring-retry, restarting an application will not result in the loss of all messages for re-processing, but simply in a strategy restart.

This approach helps to relieve the load from the external system, which may not be available due to heavy load. In other words, in addition to re-processing, we have achieved the implementation of a circuit breaker pattern.

In our case, the error threshold is only 1. And to minimize system downtime due to a temporary network failure, we use a very granular re-call strategy with short delay intervals. This may not be suitable for all applications, so the ratio between the error threshold and the interval value should be selected based on the system requirements.

A separate application for handling messages from non-deterministic logic

Here is a code that sends a message to such an application (Retryer), which sends it again to the DESTINATION topic when the RETRY_AT time is reached:

You can see that a lot of information is transmitted in the headers. The value of RETRY_AT is the same as for the mechanism of repeating through the Consumer stop. Apart from DESTINATION and RETRY_AT, we transmit:

  • GROUP_ID, on which we group messages for manual analysis and simplified search.
  • ORIGINAL_PARTITION to try to save the same Consumer for re-processing. This parameter can be null, in this case, the new partition will be obtained by the record.key() of the original message.
  • Updated COUNTER value to follow the re-processing strategy.
  • SEND_TO is a constant that indicates whether to send the message for re-processing when it reaches RETRY_AT or place it in DLQ.
  • REASON — the reason why the message processing was interrupted.

Retryer saves messages for resend and manual processing in PostgreSQL. The timer starts a task that finds messages with the RETRY_AT and sends them back to the ORIGINAL_PARTITION of DESTINATION topic with record.key().

After sending the messages are removed from PostgreSQL. Manual analysis of messages is performed in a simple UI that communicates with Retryer via REST API. Its main features are forwarding or deleting messages from the DLQ, viewing the error information, and searching for messages, for example, by error name.

Since access control is enabled on our clusters, it is necessary to additionally request access to the topic, which listens to Retryer, and allow Retryer to write to DESTINATION top. This is inconvenient, but in contrast to the approach with the topic for the interval, we have a full DLQ and UI to manage it.

There are cases when an incoming topic is read by several different consumer groups whose applications implement different logic. Repeated processing of the message through Retryer for one of such applications will lead to a duplicate on the other. To protect against this, we create a separate topical for re-processing. The incoming and the retry-topic can read the same Consumer without any restrictions.

By default, this approach does not provide a circuit breaker capability, but it can be added to the application using spring-cloud-netflix or a new spring cloud circuit breaker, wrapping the call places of external services into appropriate abstractions. In addition, it is possible to select a strategy for the bulkhead pattern, which can also be useful. For example, in spring-cloud-netflix it could be a thread pool or a semaphore.

Conclusion

As a result, we have a separate application that allows us to re-process the message if any external system is temporarily unavailable.

One of the main advantages of the application is that it can be used by external systems running on the same Kafka cluster, without significant modifications on their side! Such an application will only need to access the retry-topic, fill in a few Kafka headers and send a message to Retryer. There is no need to add any additional infrastructure. And to reduce the number of messages transferred from the application to Retryer and back, we have isolated applications with linear logic and made them re-processed through the Consumer stopping.

--

--