How We Do Monitoring in Trendyol with Kafka Lag Checker

Mert Bulut
Trendyol Tech
Published in
4 min readFeb 23, 2021

In Trendyol, we have more than 20 million active and sellable products. Each product can have some properties such as brand, category, description, price, stock, variants, etc. All of these properties can be changed at any time. We name these changes as an “invalidation”. We have to process these changes quickly. So, you need to handle this situation: for this reason, we can manage these invalidations by event-driven architecture and utilize Apache-Kafka to distribute event streaming. Thus, if you want to use Kafka or another event streaming tool in your system, you must configure monitoring and evaluation choices properly. Furthermore, the better monitoring skills you have, the stronger you can make other specifications such as:

● measuring producing/consuming rate,

● scaling resources,

● quick actions to alerts,

● building self-healing systems

As I have just mentioned, there are many producers and they developed a lot of strategies for consuming our invalidations. All these can be classified into momentary, daily, hourly, minutely topics by our business strategies. We aim to observe all topics precisely and avoid false alerts. Besides this, another aim is to scale resources by measuring the consumption rates. In addition, our initial naive implementation may look like as follows:

Figure 1: Summary of the lag checking process

Kafka Lag Checker has been built to run as a cronjob. Actually, it has got various cronjobs toward our business requirements. (Minutely, Hourly, Daily, etc.) Additionally, it could determine thresholds to all these topics, and sometimes, it tracks the consuming tendency of topics lag count by consumer groups.

When the consumer has not completed the consuming process for all events as promised, the lag checker has caught this and also triggered the Consumer again. For example, the Consumer hasn’t finished consuming Hourly1Queue at 02:00 am while it checks topic lag count through with Burrow, and then, if
topics have to lag that more than the given threshold, Lag Checker sends a message to an appropriate created topic with topic id, as illustrated here:

● message : {"topicId":"Hourly1Queue"}

After this message, our Akka consumer will create a new ConsumerActor with the semantics of the Actor Model’; meanwhile, it will begin to consume with related ActorContext and AUTO_OFFSET_RESET_CONFIG as a "latest" and other ConsumerSettings are set as the same as old consumer actor.

Burrow :

Burrow is an open-sourced and configurable application developed by Linkedin (https://github.com/linkedin/Burrow). It has represented some Restful service endpoint. As well as that, when sending a request to burrow that returns some data from related Kafka clusters and Zookeeper servers. If we want to build up the burrow as an instance, we could fill up burrow.toml as burrow use to the configuration file. A basic implementation as a config map of this might look like the following:

{
“burrow.toml”: “[zookeeper]
servers=[ \”localhost:2181\” ]
timeout=6
root-path=\”/burrow\”

[cluster]
class-name=\”kafka\”
servers=[ \”localhost:9092\”]
topic-refresh=60
offset-refresh=10

[consumer]
class-name=\”kafka\”
servers=[ \”localhost::9092\”]

[consumer.zookeper]
class-name=\”kafka_zk\”
servers=[ \”localhost:2181\”]
zookeeper-timeout=30

[httpserver.default]
address=\”:8000\”

[client-profile.myclient]
kafka-version=\”2.0.0\”
client-id=\”burrow-myclient\”

[logging]
filename=\”logs/burrow.log\”
level=\”info\”
maxsize=100
maxbackups=30
maxage=10
use-localtime=false
use-compression=true”
}

This seems fairly simple. After making this configuration like the above instance and building up the burrow docker image, we could use Burrow Api for getting Kafka clusters information. It may provide us useful information concerning the Kafka cluster by several endpoints, without any requirement like Kafka topics, cluster details, consumers, and lag counts based on consumer group, etc. Furthermore, we could achieve the status of partitions or max lag count on the topic. In this regard, we generate alerts and push notifications to Slack channels or send an email. If you wonder about a response for getting lag counts by consumer groups with partitions, let's take a look here:

// http://localhos:8080/v3/kafka/default/consumer/example-consumer-group/lag
{
"error": false,
"message": "consumer status returned",
"status": {
"cluster": "default",
"group": "Consumer Group",
"status": "ERR",
"complete": 0.7419355,
"partitions": [
{
"topic": "example-topic",
"partition": 0,
"owner": "127.0.0.1",
"client_id": "The Consumer",
"status": "OK",
"start": {
"offset": 263375,
"timestamp": 1607537410620,
"lag": 0
},
"end": {
"offset": 263375,
"timestamp": 1607537455859,
"lag": 0
},
"current_lag": 0,
"complete": 1
}
],
"partition_count": 15,
"maxlag": {
"topic": "example-topic",
"partition": 0,
"owner": "",
"client_id": "",
"status": "STOP",
"start": {
"offset": 0,
"timestamp": 1603195232505,
"lag": 0
},
"end": {
"offset": 0,
"timestamp": 1603195262598,
"lag": 0
},
"current_lag": 1,
"complete": 0.7
},
"totallag": 4
}
}

Conclusion:

In summary, I have tried to explain the monitoring-evaluation process in Kafka by using the Kafka monitoring tool, Burrow, and the alert mechanism with a self-healing system in Trendyol. Thanks for reading.

--

--