How We Solved Kafka Event Loss Problem By ‘Break’ing It Down

Zuhal Polat
Trendyol Tech
Published in
8 min readJul 10, 2023
Photo by Valery Fedotov on Unsplash

Kafka is a scalable, fault-tolerant real-time messaging platform to process millions of messages per second. However, some situations cause event losses. For example, Kafka requires good stable network connectivity between the clients and the brokers; if the network is unstable, it can cause event losses easily.

I’ll explain the architecture that we use to solve network-related event losses and the problems we encountered while implementing the architecture.

· The Beginning Of The Problem
· Outbox Pattern To Prevent Event Losses
· The Duplicate Events Created By The Outbox Pattern
· The Problems That Arise By Duplicated Events
· Let’s Solve the Problem
· Results
· Conclusion

The Beginning Of The Problem

As Delivery Core Team in Trendyol, due to the domain-specific logic; after processing an event that is consumed from the main topic, we produce it to the retry topic in kafka. With a scheduler project, we take events from the retry topic and produce them back to the main topic. After consuming and processing the event from the main topic, we produce it to the retry topic. This process continues until the event-related process is completed (Figure 1).

Figure 1

Outbox Pattern To Prevent Event Losses

We often got timeout exceptions due to network-related problems in our Kafka producers when producing events to the retry topic. And, there were too many event losses due to timeout exceptions. In general, the default Kafka config values, both for producers and brokers, are conservative enough that, under general circumstances, we shouldn’t run into any timeouts. However, even though we use the default configs, we got timeout exceptions.

Default configs for producer
- acks = all
- enable.idempotence = true
- retries = 2147483647
- max.in.flight.per.connection ≤ 5

In order to solve the event loss problem, we thought that we could implement anOutbox-Pattern (Figure 2).

Figure 2: Outbox Pattern to Prevent Event Lose

With the Outbox Pattern, we add a document containing the details of the event (key, value, headers, topic) to the couchbase byKafka Error Handler API. We prevent event loss by sending these events to the topic specified in the document with the couchbase kafka source connector later.

With this structure, we solved the event losses in case of any error. However, the network issue that caused us to implement the Outbox Pattern caused another problem: duplicate events.

The Duplicate Events Created By The Outbox Pattern

While the network problems in Kafka continued, we realized that we got timeout exceptions, even though the events were successfully sent to the kafka. Due to the exception, we also send the events to the kafka with the outbox pattern, and this caused the events to be duplicated.

In the producer implementation, when sending an event to kafka we use the provided Callback function that gives the execution result as soon as the execution is completed. When the event fails to be delivered to its intended topic, the kafka is automatically retrying to send the event that was not fully ack’ed by the brokers.

private void sendToKafka(ProducerRecord<Object, Object> producerRecord, Object body) {
try {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(producerRecord);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<Object, Object> result) {
return;
}

@Override
public void onFailure(Throwable ex) {
sendEventToErrorHandler(producerRecord, body);
}
});
} catch (Exception e) {
sendEventToErrorHandler(producerRecord, body);
throw new Exception(e.getMessage());
}
}

At this point, in error cases, kafka attempt to send the event default retriestimes. During these retries, the producer can successfully send the event to the kafka. However, we get a timeout exception without getting the response that the events are written to all in-sync replicas successfully. In this case, since we are not sure whether the event is sent to the topic or not, we send the event to the kafka via couchbase with the outbox pattern. (Figure 3)

Figure 3: The Duplicate Events Created by the Outbox Pattern

1 duplicate event occurs for 1 event and we have 2 identical events in total. If we continue to get the same errors while processing 2 events in the next retry, this time there will be 4 identical events. And it keeps increasing exponentially.

The Problems That Arise By Duplicated Events

We use batch consumers while consuming events in Kafka. The default kafka batch consumer batch size is 500, and we use the default value. Instead of using and processing a thread for each event, events belonging to the same key come together in lists to create and run a thread for each unique key (domain-specific logic Figure 4). Due to the domain-specific logic, we cannot directly get and process the first event with the same key. We need to process events that meet a certain condition from events belonging to the same key.

private <T> void processBatchEvent(List<ConsumerRecord<String, String>> consumerRecords, Consumer<T> consumerService, Class<T> eventClass) {
Map<String, List<ConsumerRecord<String, String>>> eventMap = new HashMap<>();
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
List<ConsumerRecord<String, String>> records = eventMap.computeIfAbsent(consumerRecord.key(), val -> new ArrayList<>());
records.add(consumerRecord);
}

try {
List<CompletableFuture<Boolean>> completableFutures = eventMap.values().stream()
.map(eventWrappers -> {
return kafkaBatchConsumerService.processEvent(eventWrappers, consumerService, eventClass);
}).collect(Collectors.toList());
completableFutures.forEach(CompletableFuture::join);
} catch (Exception e) {
throw e;
}
}
@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {

for (ConsumerRecord<String, String> consumerRecord : records) {
try {
T event = getEvent(consumerRecord, eventClass);

if(domainSpecificFilter(event)) {
consumerService.accept(event);
}

} catch (Exception e) {
produceErrorEvent(consumerRecord); // outbox pattern error handler
}
}
return CompletableFuture.completedFuture(true);
}
Figure 4

Sometimes a thread had to process 500 events due to duplicate events and the logic of processing events with the same key through a single thread. If we consider the processing time of an event as 250 ms, it takes at least 125 seconds (about 2 minutes) to process 500 events in total on a single thread.

Because of the total batch event processing time;

  • High CPU, memory, and network usage on the pod (Figure 5)
  • High resource usage causes the consumer client crashes and is unable to send heartbeats for a duration of session.timeout.ms, then the consumer is considered dead, and its partitions are reassigned and rebalancing occurs.
  • After the rebalance, the consumer can’t commit anything CommitFailedException occurs and events that were actually processed are processed again
  • Timeouts in the kubernetes cluster (Figures 6 and 7)
  • Causing lag in the consumers (Figures 8 and 9)
  • Restarting the container/pod
Figure 5: CPU, Memory, Container CPU Cfs Throttled, and Network Usage

Between 04/28 and 05/08 (Figure 5), the network problems started in Kafka, and the CPU, Container CPU CFS Throttled, and Lag Size values of an application running on 32 partitions, 32 pods, which only had 500,000 events to process per hour, had increased dramatically.

Also, the read/write latency and network traffic values are increased (Figures 5 and 6). And it causes a lot of timeouts between API calls.

Figure 6: Kubernetes Cluster Node Read/Write Latency
Figure 7: Kubernetes Cluster Node Network Traffic
Figure 8: Kafka Topic Write Rate
Figure 9: Kafka Topic Lag Size

Let’s Solve the Problem

We started to think about solving duplicate events caused by Outbox Pattern, without waiting for the improvements on the network side, due to the increasing timeouts, resource usage, and lag.

We didn’t want to give up the Outbox Pattern because if we give up we couldn’t handle the event losses. So we tried to eliminate the duplicate events. In all projects, the common processBatchEvent and processEventmethods work as a batch consumer and processing events. In the processEvent method, we realized that processing the only first event that passes the domainSpecificFilter() control may be sufficient, and the rest of the events do not need to be processed. So, we refactored the method.

@Async("asyncExecutorForKafkaBatchConsumer")
public <T> CompletableFuture<Boolean> processEvent(List<ConsumerRecord<String, String>> records, Consumer<T> consumerService, Class<T> eventClass) {

for (ConsumerRecord<String, String> consumerRecord : records) {
try {
T event = getEvent(consumerRecord, eventClass);

if(domainSpecificFilter(event)) {
consumerService.accept(event);
break; // the only change
}

} catch (Exception e) {
produceErrorEvent(consumerRecord); // outbox pattern error handler
}
}
return CompletableFuture.completedFuture(true);
}

After processing the first event that passes the control, with the break command, we prevent the processing of the remaining events. Thus, instead of reprocessing each duplicate event that passed the control and sending it to the retry topic, we could solve the duplicate event problem by processing only one event that passed the control and preventing the processing of the remaining events.

Results

We have solved all the problems that arise with duplicate events with a small change the break commandin the method we use.

On 05/09 (Figure 10 red box) we deployed our new method and updated the critical APIs to use the new method. As you can see in the below metrics the improvement of the new logic.

Between 05/11–05/14 we had to roll back the development because of the bug related to our domain. After fixing the bug on 05/16 we deployed the feature again. (Figure 10 green box)

  • There has been a dramatic drop in resource usage by applications. (Figure 10)
Figure 10: CPU, Memory, Container CPU Cfs Throttled, and Network Usage
  • In the cluster, the network traffic and read/write latency, with it the timeout cases also decreased. (Figures 11 and 12)
Figure 11: Kubernetes Cluster Node Network Traffic
Figure 12: Kubernetes Cluster Node Read/Write Latency
  • We didn’t get any more CommitFailedException exceptions
  • We can send the heartbeats successfully so the rebalancings, and with it, the restarts of the pod, have ended
  • The duplicate events were increasing exponentially in Kafka. With the solution, the increasing number of events in the topic decreased and the lag problem was resolved. (Figures 13 and 14)
Figure 13: Kafka Topic Write Rate
Figure 14: Kafka Topic Lag Size

Conclusion

Although Kafka is a powerful messaging system, we may encounter undesirable situations due to problems in the network. We also experienced event losses due to network problems, while trying to prevent these losses, we encountered different problems due to the same reasons.

In this article, I explained how we solved the problems we experienced while using kafka as both producer and consumer, the results of the problems we solved, and how we improved them.

Now, with the architecture we have implemented, we can process events in a healthy way without event loss, high resource usage, and timeout problems.

I would like to thank Kaan Taş and Firat Feroglu for all their help and support during the development and writing process of this article. ❤️

If you want to be part of a team that tries new technologies and want to experience a new challenge every day, come to us.

--

--