Batch Processing with confluent-kafka-go

Bora Buyukoz
Trendyol Tech
Published in
7 min readJul 26, 2023

Let’s discuss and demonstrate how we can boost the performance of an Apache Kafka consumer/producer in Golang by applying batch processing. We will be using Confluent’s Apache Kafka client confluent-kafka-go.

Environment

  • Golang v1.16
  • confluent-kafka-go v1.9.1

Insight

The Problem

As Delivery Core Team at Trendyol, we have encountered many times that our consumer is lagging and struggle to consume and process the messages mainly due to network or broker issues. There are times that increasing the topic partition count and scaling the application do not suffice to increase the performance of the consumer/producer application. Our Java Apache Kafka consumers are batch processors and are high performant applications, so we wanted for our Go consumers to behave the same way.

The Go Client

Confluent-kafka-go is the Go client for Apache Kafka developed and maintained by Confluent. It internally uses librdkafka (rapid development library for kafka in C, based on librd) and exposes it as a Go library.

Consumer

When we poll events, we don’t actually interact with the broker directly. Confluent-kafka-go polls a batch of events and buffers it locally through an internal queue. And we poll events from that queue. The committing offsets is left to the client.

What do we need to do?

If we set enable.auto.commit as true, all the internally polled events will be committed even though we haven’t been processed them yet. This may be confusing since we may see no lag for the consumer group but there would still be unprocessed events. So:

  • We need to set enable.auto.commit as false, we have to handle committing offsets manually.
  • And also we can customize the processing of the messages by leveraging the go routines.

Producer

Producer operates asynchronously and puts a message in the internal transmit queue and returns immediately. Again, messages in transmit queue are batched before transmission. If we need the messages that are failed to be produced, we need to listen the events channel to catch the failing deliveries. And yes, we needed the failing messages to not lose those events.

What do we need to do?

  • We need to listen the events channel and get kafka.Message type of events, and check for TopicPartition.Error for failing deliveries asynchronously.

A Little More Context

Before we discuss how we can configure client for batch processing, we first need to discuss our application specific behavior.

Typical Consumer Behavior

A typical consumer behavior
  1. Poll message
  2. Check for errors
  3. If any error, continue polling
  4. Process message
  5. Check for errors
  6. If any error, produce error message
  7. If no error, commit message
  8. Commit message

In the context of our applications, a consumer also acts as a producer, as it produces the messages that cannot be processed due to an internal or a business error.

How to configure and develop for batch processing?

Consumer

The internal mechanic of the library consumes events in batches and buffers them through an internal queue. This mechanic singlehandedly works very efficient. But we can further increase the performance by:

  • Batch commits, to lower the overhead of network, and
  • Async processing, to boost the process rate

Batch Commits

To commit manually, we need to disable auto committing.

enable.auto.commit: false

We need to poll a number of messages and commit when a certain batch size reached. Example from the library documentation for a batch synchronous commit:

msg_count := 0
for run == true {
ev := consumer.Poll(100)
switch e := ev.(type) {
case *kafka.Message:
msg_count += 1
if msg_count % BATCH_SIZE == 0 {
consumer.Commit()
}
}

Async Processing

Async processing requires a bit more context. The message order may be crucial in some cases.

In our case, the message order mattered for the messages with the same key. We need to process the messages with the same key in the order they arrive. Let’s say we have a batch size of 6. With the given order below, 3 messages have the key 2222, 1 message has the key 3333, and 2 messages have the key 4444. We can process the messages in the following manner:

Async processing of messages

The messages with same key are pushed in a slice, and each slice is assigned to a go routine. Messages with the same key are processed sequentially in their own routines. In the worst case scenario, we would have BATCH_SIZE number of routines.

Final consumer looks like:

eventCount := 0
messages := make(map[string][]*kafka.Message) //Storage for messages to be batch processed. Our case requires a map for messages of keys.
for {
msg, err := sc.Instance.ReadMessage(time.Second)
if err != nil {
if k, ok := err.(kafka.Error); ok && k.Code() == kafka.ErrTimedOut {
//In case of a timeout, do not wait reaching the BATCH_SIZE. Process stored messages.
if len(messages) > 0 {
sc.process(&messages, &eventCount)
messages = make(map[string][]*kafka.Message)
}
}
continue
}
put(&messages, msg) //Store messages to batch process later.
eventCount++

if eventCount%BATCH_SIZE == 0 {
sc.process(&messages, &eventCount)
messages = make(map[string][]*kafka.Message)
}
}
}

func (sc *Consumer) process(messages *map[string][]*kafka.Message, eventCount *int) {
sc.processMessages(messages) //Async processing
*eventCount = 0
sc.commit()
}

In case of a poll timeout, we must not wait reaching the BATCH_SIZE. We can process the stored events directly. Because, the topic would go idle for a long time before reaching the BATCH_SIZE, and the stored messages may not be processed.

So, after we configure and develop batch processing, the consumer behavior looks like this:

A batch processing consumer behavior
  1. Poll message
  2. Check for errors
  3. If any error, check for stored messages
  4. If there are stored messages, async process
  5. If no stored messages, continue polling
  6. Store the message
  7. Check the batch size
  8. If the batch size reached, async process
  9. If not reached, continue polling
  10. Batch commit

Producer

The library offers a batch producer that can be enabled with:

go.batch.producer: true

This is not about the message batches that are created before transmission. By enabling batch producer, we enable the use of produceBatch() interface. However, the documentation states that this is an experimental interface and, timestamps and headers are not supported. We needed both, so in our case, it is not possible to use.

Since the batch producer is off the agenda for our case, we can increase the performance by:

  • Async writes

Async Writes

We need to listen for the delivery reports in a separate routine. Example from the library documentation for async writes:

go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
}
}
}
}()

If we’re only interested in failed message delivery reports, we can ignore other events and remove the switch and if statements by enabling:

delivery.report.only.error: true

Results in Staging

In staging environment, we tested a consumer to process various number of events incrementally by lagging the consumer. We examined process rates and resource consumptions to determine the optimal BATCH_SIZE. In staging area, the consumer instance count is 1. So, we tested a single consumer performance under lag.

Batch processing metrics in staging environment

In staging area, under 500k message lag:

  • Single record process rate:~12.05k rpm, cpu consumption: ~0.054M
  • Batch 50 record process rate: ~375k rpm, cpu consumption: ~0.5M
  • Batch 300 record process rate: ~714.29k rpm, cpu consumption: ~0.87M

According to the data we gathered, we can deduce that the batch processing is 20 to 60 times faster, 9 to 18 times more resource hungry.

The optimal BATCH_SIZE is determined to be 50 records considering the cpu utilization.

Effects in Production

One of our consumers was lagging around 1.5m-2m messages.

  • Application is running with 12 instances
  • The topic has 12 partitions
  • 1 of the partitions was lagging around 1m messages
  • The other 11 partitions was equally lagging around 50k messages

This was an unusual case, and we decided to activate the batch processing around 09:58 am and here are the monitoring metrics.

Consumer deployment metrics (Cpu, Memory, Network Traffic)

The application was struggling since 09:40 as seen in Pod/Container CPU graph. When batch processing first enabled during heavy lag around 09:58, cpu and memory utilization slightly increased . However, this is true until the lag is fully processed. If there is no lag, the utilization is pretty much the same as non-batch processing. So, we decided that we can leave batch processing enabled for good.

Consumer lag visualization

The partitions are relieved after a short period of time and lag is fully consumed.

Conclusion

Although confluent-kafka-go is a highly efficient and reliable consumer/producer client for go applications, the performance under heavy traffic can sometimes be unpredictable.

In this article, I summarized how we enabled a custom batch processing in a confluent-kafka-go consumer with a couple of simple adjustments, and gain significant performance and efficiency in return.

A potential drawback with this approach might be that if our application is not scaled enough and/or running on a single instance, the resource consumption may be more than we can handle, we may need to use a lower batch size, thus lower performance. Another drawback is that if the processing order of events with different keys is also crucial, we might not be able to use async processing, which actually boosts the performance.

A further improvement would be a detailed comparison with other go kafka clients such as Sarama and Segmentio using batch consuming/processing features under heavy traffic.

Ready to take your career to the next level? Join our dynamic team and make a difference at Trendyol.

--

--