Kafka Konsumer Two Years Journey 🚀

Abdulsamet İLERİ
Trendyol Tech
Published in
7 min readJul 18, 2024

Last year, we introduced kafka-konsumer and kafka-cronsumer libraries in their articles. After one year of development, we are thrilled to write a new blog post introducing other useful features and our use cases. During those two years of journey, these two libraries have been used in 100+ projects in Trendyol. 🚀

First of all, let’s remember what these libraries are. Basically,

  • Kafka Konsumer: ensures the easy implementation of Kafka consumer with a built-in exception manager (Kafka-cronsumer).
  • Kafka Cronsumer: cron-based Kafka exception consumer with the power of auto retry & concurrency
Figure: Logo

As a Seller Product Indexing Team, we have used these libraries for every consumer project. It has worked in production for a very long time; it gave a good performance 💪. For example,

Figure: Lag Graph, 16 pods and 16 partitions for the international category
Figure: Lag Graph, 16 pods and 16 partitions for the content features
Figure: TP Graph, Lots of different types of messages under the heavy load

Last year, we also wrote New Winner of Kafka Consumers: Scala to Go Journey 350 Million Messages per Day. You can find our re-platforming journey and more performance insights!

Let’s take a look at what we develop and our use cases.👀

Table of Contents

  • Distributed Tracing (Opentelemetry) Support
  • Pause/Resume consumer any time
  • Run pre-batch function for batch mode
  • Error-Only Retry for batch consuming
  • Skip Message By Header Feature
  • Exposing Prometheus Metrics
  • What’s new on Kafka Cronsumer?
  • Conclusion

#1 — Distributed Tracing (Opentelemetry) Support

Kafka Konsumer uses the segmentio/kafka-go library inside, but unfortunately, Kafka-go does not support open-telemetry. There is also an ongoing issue about it. Based on some work on that issue, we created the otel-kafka-konsumer project.

By integrating the otel-kafka-konsumer project with the kafka-konsumer, we successfully implemented distributed tracing in consuming and producing operations. 💃

You can run the demo.

In simple terms, only two settings are significant.
- trace.TracerProvider (you can set jaeger,zipkin etc.)
-
propagation.TextMapPropagator (please refer to here)

Enable with DistributedTracingEnabled: true; kafka-konsumer does the rest.

consumerCfg := &kafka.ConsumerConfig{
DistributedTracingEnabled: true,
}
Figure: Demo Jaeger dashboard example

Our Use Case: Distributed tracing is crucial for identifying which part of your architecture is the bottleneck. We followed an input Kafka message (via traceId) at every step and saw how our architecture performed.

Figure: A trace captured from production

#2 — Pause/Resume consumer any time

Pause/resume consumer functionality has good benefits. For example,

  • It helps to process long-running messages and implement delaying messages.
  • Easily implement backpressure or control flow mechanism based on processing time or buffer limit.

You can control the Pause and Resume functions. This way, the consumer group state is unaffected because it still sends a heartbeat under the hood. You can read how it works.

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()
fmt.Println("Consumer started...!")

consumer.Pause()
// ..
consumer.Resume()

You can run the demo.

Our Use Case: We processed some types of messages (brand, category, etc.) in the evening. Why? Because a category can have more than 50 million products, we must change the category field for more than 50 million products. It's really heavy work, so we need to do this in the evening. So, this feature is also great for these use cases.

Figure: Lag graph of Brand messages processed in the evening

#3 — Run pre-batch function for batch mode

The pre-batch function has good benefits. For example,

  • Same-Key Compaction: Retain only the latest message for each key to reduce the number of processed messages.
  • Merging: Combine related messages into one message to simplify downstream processing and reduce message volume.
  • Pre-Processing: Apply transformations or filtering to the data before processing the batch.

You can quickly implement a pre-batch function for batch mode consumption. For example, if we have a version based pre batch function (getting the latest version of a specific message)

{ "id": 1, "version": 1, "payload": "foo" }
{ "id": 1, "version": 2, "payload": "foobar" }
{ "id": 1, "version": 3, "payload": "foobarfoobar" }
{ "id": 2, "version": 1, "payload": "x" }
{ "id": 3, "version": 2, "payload": "y" }

after preBatch
{ "id": 1, "version": 3, "payload": "foobarfoobar" }
{ "id": 2, "version": 1, "payload": "x" }
{ "id": 3, "version": 2, "payload": "y" }

As a result, the number of messages decreases from 5 to 3.

You can run the demo.

Our Use Case: We merged some category messages to decrease the number of messages processed. If we don’t do that, we create extra work for the system. It saves us from processing millions of extra events.

Figure: Category messages need to be merged

We couldn’t handle the high load if we didn't merge this. As you can see below, the number of produced events is about 250m+ before closing the consumer. (Yes, we couldn’t let it continue like this)

Figure: Category messages without merging

After merging, we decreased category messages from 12.330 to 571 💪 And by merging, we removed extra messages.

Figure: Category messages enabling merging

#4 — Error-Only Retry for batch consuming

Assume you are using batch-consuming mode and polling 100 messages from the topic. Some messages failed while processing this group of messages. If you want to retry only failed messages (not all), you can disable the transactional-retry feature easily.

consumerCfg := &kafka.ConsumerConfig{
..
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
...
}

func batchConsumeFn(messages []*kafka.Message) error {
for i := range messages {
if i < 2 {
messages[i].IsFailed = true
messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount)
}
}

// you must return error here to retry only failed messages
return errors.New("err")
}

You can run the demo.

Our Use Case: Let’s assume we take 100 messages from the topic and query elastic search for every one of them. While some of these are available in elastic search, some are not. We produce unavailable messages to the retry topic and retry after a fixed cron interval.

Figure: Query every message, send to retry topic unavailable ones
Figure: Three messages in the same batch have different errors

#5 — Skip Message By Header Feature

In the event streaming world, you must pass over every message step by step. Sometimes, you need to skip and continue processing to the others. Ideally, a message’s selection value should be specified in its header, not its body, so the function accepts message headers. [Ref]

consumerCfg := &kafka.ConsumerConfig{
..
RetryEnabled: true,
SkipMessageByHeaderFn: skipMessageByHeaderFn,
...
}

func skipMessageByHeaderFn(headers []kafka.Header) bool {
for _, header := range headers {
if header.Key == "SkipMessage" {
return true
}
}
return false
}

You can run the demo.

Our Use Case: Let’s return to our category example; above, we said that we processed category events at night, but there are exceptional category event types we need to process immediately. For this, we created two consumers in the same app, used SkipMessageByHeaderFn for each, and ran at production flawlessly.

Figure: SkipMessageByHeader Func
Figure: Lag Graph of the instant and night consumers

#6 — Exposing Prometheus Metrics

Kafka Konsumer exposes its own three metrics and exception metrics from kafka-cronsumer. We can easily create a Grafana dashboard and Prometheus alerts based on these.

Figure: Sample dashboard based on the below metrics
kafka_konsumer_processed_messages_total_current
kafka_konsumer_unprocessed_messages_total_current
kafka_konsumer_error_count_during_fetching_message_total_current
kafka_cronsumer_retried_messages_total
kafka_cronsumer_discarded_messages_total

You can use APIEnabled: true If you want to use Kafka konsumer internal API

consumerCfg := &kafka.ConsumerConfig{
APIEnabled: true,
..
}

You can run the demo.

(If you want to change the prefix of the exposed metrics, you can provide the MetricPrefix field)

Or you can get metrics collectors and inject them on your own API.

firstConsumer, _ := kafka.NewConsumer(&kafka.ConsumerConfig{})
secondConsumer, _ := kafka.NewConsumer(&kafka.ConsumerConfig{})

allCollectors := append(firstConsumer.GetMetricCollectors(), secondConsumer.GetMetricCollectors()...)

StartAPI(allCollectors...) # Your own api

You can run the demo.

What’s new on Kafka Cronsumer?

  • Exception management is based on back-off strategy support (linear, fixed, and exponential options). Here details
  • Added x-error-message to see what was the error of the message during processing. To learn why any message is in an exception/retry topic, look at its own x-error-message header.
Figure: See why this message in the exception by looking at x-error-message header

Conclusion

Thank you to all the contributors (20+) who have participated in our two-year journey. I especially want to thank Emre Odabaş for all your support.

We are open to hearing your voice and feedback. Don’t hesitate to contact us and give ⭐️️ ⭐ ⭐️ 😄

About Us

Want to be a part of our growing company? We’re hiring! Check out our open positions and other media pages from the links below.

--

--