Adding Time Lag to Monitor Kafka Consumer

Being intelligent about measuring our Kafka consumer lags

Jessie Tanaboriboon
Agoda Engineering & Design
7 min readSep 7, 2021

--

At the core of Agoda’s data pipeline is Apache Kafka. Each day about 1 Trillion messages are sent across our Kafka clusters.

Various teams at Agoda produce and consume messages from the pipeline and in many cases, it’s being used to directly influence business decisions.

What is Kafka Offset Monitor?

Monitoring the performance of Kafka consumers is vital in order to ensure that applications that require low latency data streams are operating correctly.

A common way of doing this is to measure the consumer lag (i.e. the difference in message offset id between the last produced message for a topic partition in Kafka and the last consumed message for that topic partition) to detect and remedy consumers that are falling too far behind.

The tool we’re using is called “Kafka Offset Monitor”, a project first forked from Quantifind but that has been substantially modified over the years; there is a user interface for us to monitor all the active consumer groups in each Kafka cluster.

Kafka Offset Monitor in its original form only measures absolute message lag, the number of messages that the consumer lags behind produced messages.

Lag = last produced message — last consumed message

Finding the lag itself is pretty simple. We must first poll from the __consumer_offsets topic, the internal Kafka topic that keeps a record of the latest offset position committed by all the consumer groups in the cluster. From this, we’ll be able to find the last consumed message position.

Then we can find the last produced message position, by sending a request to Kafka Consumer Offsets API. Once we have the last produced message position and last consumed message, we can just simply find the lag by subtracting one from the other.

The lag calculated here is not necessarily exact because we’re not finding the last produced message and last consumed message at the same time; however, the time difference between the request is short enough to be negligible.

Using absolute message lag helps us to identify lagging consumer groups, however, each consumer group may consume the messages differently.

For example, consumer group A might process messages quickly and the topic that it subscribes to produces a lot of messages while another consumer group B might take a long time to process a single message, and the topic that it subscribes to has a much lower message rate.

Since consumer group A processes the messages quickly, a lag of 100,000 messages might mean that consumer group A is only lagging by a few seconds, whereas a lag of 10 messages for group B might mean that they are lagging by an hour because they take a very long time to process the message or maybe only 10 messages are produced in the last hour and none of that has been consumed.

Being able to monitor lag by actual time instead of the number of messages would give much more insight into how consumers are performing and which ones need attending to.

As a result, we recently added a new feature to our Kafka Offset Monitor called time lag. The time lag added to Kafka Offset Monitor is inspired by work from Lightbend in their Kafka Lag Exporter and the accompanying blog post.

What exactly is time lag?

The time lag is an estimated length of time that a consumer group is behind the last produced message for a particular topic partition.

We experimented with different ways to calculate the time lag but the most accurate way we found so far is by using the following formula.

Time lag = Current time — Δ

where, Δ= The time when the last consumed message was the last produced message

“The time when the last consumed message was the last produced message”, or Δ, might be a little bit confusing at first, so let’s look at some examples. Let say we have consumer group A who is consuming from the topic “Oranges” and here is a table to show the last produced message position, last consumed message position, and the time which the last consumer commit message was the last produced message.

Now that we understand what Δ is, we can now look at the lag and the time lag. In reality, the Lag value could differ greatly between 2 different consumer groups while the time lag for the 2 consumer groups is similar.

A consumer with high offset lag but low time lag
A consumer with low offset lag but high time lag

How do we find the time when the last consumed message was the last produced message?

A new map has been created to store all the topics and partitions that are consumed by the active consumer and their corresponding interpolation table.

There will be one interpolation table for each Topic partition. The table itself contains points, where each point consists of an offset and their corresponding timestamp.

Example of the interpolation table:

We will use this table to obtain the time in which the last commit message was the last produced message. Using the table above as an example, if the last commit message is 212, the value return will be 1597304241. If the last commit message is 240, the value return will be 1597304361.

What happens if there’s no point in the table in which the offset corresponds with the last commit message? The answer to that is very simple, we can just use interpolation. For example, if the last commit message is 250, the return value will be 1597304391.

(250–240) / (260–240) = (x — 1597304361) / (1597304421–1597304361)

Now, what happens if the last commit message is out of the table range? Before answering that question let’s have a look at how we populate the table.

How do we implement it?

A separate thread is created specifically to populate the table. Every minute we will send a request to Kafka Consumer Offsets API to obtain the last produced offset for all the active topic partitions.

The last produced offset will then be added to the table and the timestamp value that corresponds with that offset is the timestamp for when the request was sent to Kafka.

Therefore this is only an estimation and the time lag calculated from this is also an estimation. Since the table is populated every minute, the time lag value can only ever be as accurate as one minute. As a result, the granularity in which we report the time lag is a minute.

Now to answer the question from earlier, what happens if the last commit message is out of the table range? In the case that the last commit message is larger than the latest value stored in the table, the time lag for that consumer group can not exceed one minute since we’re populating our table every minute. As a result, we simply just report the time lag value for that case as less than 1 minute.

What’s next?

Currently, we are using both lag and time lag to monitor Kafka consumers since each of them gives us different insights into consumer behavior.

Since time lag is still a new feature, most of the current monitoring is based mainly on offset. However, we are now shifting our monitoring to be based on time rather than offsets in our internal Kafka ecosystem.

Straight away we have been able to benefit from using time lag monitoring and can now find problematic Kafka Clients, react faster, and spend less time investigating false positives.

Author’s Note

This project was a part of my 2 month summer internship program at Agoda. By working on this project, I learned a lot more about Kafka and the rest of Agoda’s Data Pipeline.

It has been a great and fun experience. So I would like to give a big thanks to everyone at Agoda, especially to the “messaging” team, for having me there. Special shot-out to Akshesh Doshi and Johan Lundahl for helping me and reviewing this blog. 😄

Join the team

careersatagoda.com

--

--