Journey of Apache Kafka & Zookeeper Administrator ( Part 14 )

Davinder Pal
Analytics Vidhya
Published in
4 min readSep 10, 2020

July 2020 ( Kafka Consumer Group Monitoring )

Consumer Group Monitoring is very important because it provides stats about consumer applications and how far aka lag the application is from the actual stream of data.

First: Yahoo Kafka Manager aka CMAK

There is a minor problem with CMAK, It will only show you the latest stats, you can never compare historical results. If you only want to know the latest stats about the consumers then this is one of the best tools for Apache Kafka Consumer Group Monitoring.

Second: Custom Monitoring Script ( Python 3 Based )

With this tool my architecture for monitoring becomes very consistent, Python Script for Metric Extraction + Splunk for Indexing & Dashboard.

We need a python package called kafka-python because when we a create consumer using the above library it allows a consumer to fetch its stats as well but the Same things are not supported in Confluent-Kafka-python Package for now.

Now let’s check the code.

from kafka import KafkaConsumer, TopicPartition, KafkaAdminClient
import json
import sys
from datetime import datetime
import threading
class KCMetric:
def __init__(self,topic,group_id,logDir,env):
self.TOPIC = topic
self.GROUP = group_id
self.BOOTSTRAP_SERVERS = ['localhost']
self.ENV = env
self.LOGDIR = logDir
self.CTIMENOW = datetime.now()
self.metricJsonDict = {
"topic": self.TOPIC,
"group_id": self.GROUP,
"env": self.ENV,
"@timestamp": str(self.CTIMENOW),
"partition": {}
}
def checkConsumerGroupName(self):
__kc = KafkaAdminClient(bootstrap_servers=self.BOOTSTRAP_SERVERS)
cgnTuple = (self.GROUP, 'consumer')
for i in __kc.list_consumer_groups():
if cgnTuple == i:
return True
return False
def getMetric(self):
consumer = KafkaConsumer(
bootstrap_servers=self.BOOTSTRAP_SERVERS,
group_id=self.GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(self.TOPIC):
tp = TopicPartition(self.TOPIC, p)
consumer.assign([tp])
committed_offset = consumer.committed(tp)
if committed_offset is None:
committed_offset = 0
for _, v in consumer.end_offsets([tp]).items():
latest_offset = v
self.metricJsonDict["partition"][p] = {}
self.metricJsonDict["partition"][p]["committed_offset"] = committed_offset
self.metricJsonDict["partition"][p]["latest_offset"] = latest_offset
self.metricJsonDict["partition"][p]["lag"] = latest_offset - committed_offset
with open(self.LOGDIR + "kafka-consumer-group-metrics.log", 'a+') as logFile:
logFile.write("\n")
logFile.write(json.dumps(self.metricJsonDict))
logFile.write("\n")
consumer.close(autocommit=False)
def main():
inputFile = sys.argv[1]
logDir = sys.argv[2]
env = sys.argv[3]
# clean up log file before writing new data
open(logDir + "/kafka-consumer-group-metrics.log", 'w').close()
for line in open(inputFile):
line = line.strip()
if not line.startswith("#") and len(line) > 0:
topic = line.split()[0] + "." + env.split("-kafka")[0]
group_id = line.split()[1]
try:
kc = KCMetric(topic.strip(), group_id.strip(), logDir, env)
if kc.checkConsumerGroupName():
_t = threading.Thread(
target=kc.getMetric
).start()
except:
print("something failed")
main()

The Most Important Function is

def getMetric(self):
consumer = KafkaConsumer(
bootstrap_servers=self.BOOTSTRAP_SERVERS,
group_id=self.GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(self.TOPIC):
tp = TopicPartition(self.TOPIC, p)
consumer.assign([tp])
committed_offset = consumer.committed(tp)
if committed_offset is None:
committed_offset = 0
for _, v in consumer.end_offsets([tp]).items():
latest_offset = v
self.metricJsonDict["partition"][p] = {}
self.metricJsonDict["partition"][p]["committed_offset"] = committed_offset
self.metricJsonDict["partition"][p]["latest_offset"] = latest_offset
self.metricJsonDict["partition"][p]["lag"] = latest_offset - committed_offset
consumer.close(autocommit=False)

If you check my code closely, what I am doing is

  1. creating a consumer.
  2. listing the total number of partitions for the given topic.
  3. create TopicPartition object.
  4. assign a consumer to the TopicPartition object.
  5. read two metrics ( consumer.committed & consumer.end_offsets ) for TopicPartition object.
  6. Append metric to Python Dict / JSON Object.
  7. Repeat Step 3–6 for all partitions.
  8. Close Consumer without committing anything & save as JSON.

There two important inputs required for this

  1. Topic Name
  2. Consumer Group Name

I can do this for all consumer groups but I require to monitor & alert In-Line Transaction Topics & Consumers Only.

The rest of the Topics can be monitored by CMAK which I already have in my Setup.

kafka-consumer-group-metric-input.txt

#######################################################################
# This file is input file for consumer group metric collection script.
# Usage: <topic name without environment>[space]<group_id>
#
# Note:
# 1. if you same consumer reading from mulitple topics then
# add multiple lines with each topicName and same group_id.
# 2. group_id is case sensitive.
# 3. if you have a typo in topicName or group_id, metric won't be collected.
# 4. group_id should be same across environments.
# 5. all group_id/topic stats can't be collected as those will be too much
# so only In Line Transaction Topic/group_id data should be collected.
#######################################################################
topic1 consumer_group_id1
.....

Sample Output

{
"topic": "topic1",
"group_id": "group_id1",
"env": "dev-env-kafka",
"@timestamp": "2020-05-18 15:36:01.191032",
"partition": {
"0": {
"committed_offset": 13626733,
"latest_offset": 13626733,
"lag": 0
},
"1": {
"committed_offset": 13623397,
"latest_offset": 13623397,
"lag": 0
},
"2": {
"committed_offset": 13615804,
"latest_offset": 13615804,
"lag": 0
}
}
}
....

I will run this as well in cron so I added a task in ansible to cover up this script as well.

- name: creating folder for jmx monitor
file:
path: "{{ kafkaInstallDir }}/jmxMonitor"
state: directory
- name: copying script and input files
copy:
src: "{{ item }}"
dest: "{{ kafkaInstallDir }}/jmxMonitor/{{ item }}"
loop:
- kafka-consumer-group-metric.py
- kafka-consumer-group-metric-input.txt
- name: creating kafka consumer group metric collector cron
cron:
name: "kafka consumer group metric collector cron task"
minute: "*"
hour: "*"
weekday: "*"
user: root
job: 'python3 {{ kafkaInstallDir }}/jmxMonitor/kafka-consumer-group-metric.py {{ kafkaInstallDir }}/jmxMonitor/kafka-consumer-group-metric-input.txt {{ kafkaLogDir }}/ {{ kafkaClusterName }}'

Ansible Playbook: 116davinder/kafka-cluster-ansible/clusterConsumerMetricSetup.yml

Finally, we can make the Splunk dashboard out of it.

Splunk Code: 116davinder/kafka-cluster-ansible/splunk-dashboards/apache-kafka-consumer-group-metrics.xml

Splunk Sample Dashboard: 116davinder/kafka-cluster-ansible/splunk-dashboards/Kafka-Consumer-Splunk-Metric-Dashboard.png

why custom monitoring script?

  1. Need to know the historical trend as a well-forgiven consumer over the last six months.
  2. It should be extendable and easy to understand.

There is another great tool called Burrow from LinkedIn but I never got a chance to explore it.

The journey will continue on Next Article(Initial Support for Containers)

--

--