Kafka
Kafka Real-time streaming application monitoring and alerting
Nuances and considerations for Kafka realtime applications monitoring and alerting criteria, metrics , thresholds
Table of Contents
· Introduction
· Alerting and monitoring Kafka Streaming Application(s)
· What are the golden Signals?
· Different Kafka Metrics
· Streaming Application
∘ Consumer Lag
∘ Throughput
∘ Process Latency
· Kafka Broker Key Metrics
∘ Do you know how loaded the broker is?
· Critical Metrics and threshold
∘ Active Controller count
∘ Offline partitions
· Replica
∘ Under Replicated Partitions
∘ Under Min In Sync Replicas
· Broker Network and I/O
∘ Broker IO activity
· ZooKeeper
∘ Zookeeper Average Latency.
∘ Zookeeper Connections
∘ Broker Zookeeper disconnections
· Hardware Metrics Thresholds
· Confluent Replicator
∘ Replicator metrics MBean:
· StateFul Operations
∘ Streams — State Store Metrics
∘ Streams — Record Cache Metrics
· RocksDb Metrics
∘ number-file-errors-total
∘ memtable-hit-ratio
∘ block-cache-filter-hit-ratio
· TOOLS
Introduction
Apache Kafka® is a distributed, fault-tolerant streaming platform. It can be used to process streams of data in real-time. I have articulated the learnings from the experience of taking Kafka Streaming application root to live. Monitoring helps to observe the system and find issues well before they surface. While alerting raise alarms for warning and critical parameters of the system. Both when used wisely can determine the health deterioration of any data-intensive application.
The primary problem in such a system is the number of available parameters and metrics. For instance, there are ~ greater than 250 JMX metrics of Kafka broker, consumer, zookeeper, replicator etc. It becomes chaotic if all of them are used for monitoring and alerting. There needs a strategy to determine specific metrics for Observability and monitoring as well as a threshold for wise alerts. Alerts should also consider aggregation and mitigate false positives.
In this article, I list out metrics that we had used predominantly in one of our BigData Kafka Streaming applications.
Alerting and monitoring Kafka Streaming Application(s)
What is an intelligent alerting and monitoring solution?
The question that needs to be asked while designing the Kafka alerting and monitoring strategy.
- What are the metrics for Monitoring and Alerting?
- What are the golden signals in monitoring?
- Which are key metrics to be monitored for Kafka Streams application, Kafka Brokers, Zookeeper and Kafka Replicator?
- What is Performance Monitoring for Kafka Streams application?
- How to identify current and potential problems?
- Aggregation of Alerts and rationale behind it?
What are the golden Signals?
Google’s site reliability engineers (SREs) defined four key metrics to monitor. They call them the “four golden signals”: latency, traffic, errors, and saturation.
- Latency: The time it takes to service a request. Time is taken for a record to reach from Source to Sink.
- Traffic: number of queries received every second / Number of records processed in the Data pipeline.
- Errors: The rate of requests that fail. The failures can be explicit.
- Saturation: Services degrade in performance as they approach high saturation. Indicator CPU, Memory, Disk Space etc.
Different Kafka Metrics
Let's have a view of metrics from different Kafka components. The below-given diagram illustrates various metrics available in a simple streaming application. Hope this helps to imagine the stages where the metrics need to be monitored. The article will provide a list of chosen metrics which you can relate to the diagram given below.
Streaming Application
Please go through the Kafka streaming application documentation before you look into metrics and thresholds. https://kafka.apache.org/31/documentation/streams/architecture
Consumer Lag
Consumer lag can be monitored using the kafka-consumer-groups command-line tool, or using the consumer’s JMX metric. Increasing lag means applications are performing slower and latency is increasing.
MBean
kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}
Attribute: records-lag-max.
Throughput
The average number of records sent per second for a topic. If throughput is lower then it indicates latency is higher. for a Stable system, throughput should be at least 1.5x of the actual critical limit all the time.
For example, if your system needs 75 msg /sec then one makes the system process on 115 msg/sec. Warning on 90 msg/sec and critical alert at less than 75 msg/sec. This example tries to illustrate how smart alerts can be set to avoid false positives, and give time to determine health and corrective action. A too much higher threshold would demand over-engineering, and a lower threshold would be less time to take corrective action.
MBeankafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([-.w]+)Atttribute: record-send-rate
Process Latency
The [average | maximum] execution time in ns, for the respective operation for this task.
MBean:kafka.streams:type=stream-metrics,client-id=([-.\w]+)Attribute: process-latency-avg | max
Consumer Group Join Rate
The number of the group joins per second
MBean:kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.w]+)Atttribute: join-rate
Kafka Broker Key Metrics
Do you know how loaded the broker is?
Following important operational broker metrics aggregated across the cluster, and per broker or per topic where applicable.
- Aggregate incoming message rate.
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
- Byte-in rate from clients
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
- Request rate
kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower}
- Byte-out rate from clients.
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
- RequestQueueSize: Size of the request queue. A congested request queue will not be able to process incoming or outgoing requests.
kafka.network:type=RequestChannel,name=RequestQueueSize
- Have you investigated where request time is spent?
Total time in ms to serve the specified request.
kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower}
Critical Metrics and threshold
Active Controller count
Description:
The Controller is responsible for maintaining the list of partition leaders, and coordinating leadership transitions.
•If ActiveControllerCount < 1: Producers/Consumers can’t get the partition leaders anymore.
•If ActiveControllerCount > 1: A split-brain problem occurs, and that’s really bad!
Metric:
- Sum the JMX metric
kafka.controller:type=KafkaController,name=ActiveControllerCount across the cluster.
Offline partitions
•Description:
An Offline Partition is a partition without an active leader and is hence not writable or readable. The presence of Offline partitions compromises the data availability of the cluster.
•Metric:
Sum the JMX
kafka.controller:type=KafkaController,name=OfflinePartitionsCount across the cluster
Replica
Under Replicated Partitions
Description:
- In a healthy cluster, the number of in sync replicas (ISRs) should be exactly equal to the total number of replicas. In other words, the metric ensure the partitions are respecting the topic replication factor configuration. Under replicated partitions can happen when a broker is down or cannot replicate fast enough from the leader (replica fetcher lag).
Metric:
Sum of JMX
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions across the cluster
Under Min In Sync Replicas
Description:
- When Kafka is configured for durability, then the producers are configured with acks=all. Also the topics must be configured with enough min.insync.replicas. If the partitions have less than min.insync.replicas, then clients cannot produce due to NotEnoughReplicas exception. Producer may retry depending on its configuration. In a nutshell, when you have partition under min ISR data production is blocked.
Metric:
Sum of JMX
kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount across the cluster
Broker Network and I/O
Broker IO activity
Description:
- Internally a broker node uses the I/O Thread to read a message from the Request Queue, writes it to the OS page cache and places it into the Purgatory where your replication strategy will be executed. The thread idle time provides a level of utilization gauge on the broker I/O:
•When idle==1: The broker is inactive.
•When idle==0: The broker is always processing.
Note: Percentage value is between 0 to 1. 1 means 100% idle, 0.4 means 40% idle.
Metric:
JMX MBean
kafka.network:type=SocketServer,name=RequestHandlerAvgIdlePercent (monitored per broker)
Broker Network activity
Description:
Same as above, a broker node uses the Network Thread to read a message from the network and places it into the Request Queue.
•When idle==1: The broker has no inbound traffic,
•When idle==0: The broker is always receiving messages
Percentage value is between 0 to 1. 1 means 100% idle, 0.4 means 40% idle.
Metric:
JMX MBean
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent (monitored per broker)
ZooKeeper
Zookeeper Average Latency.
Metric: JMX AvgRequestLatency
Zookeeper Connections
Metric: JMX NumAliveConnections
Broker Zookeeper disconnections
Metric: JMX
kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec
Hardware Metrics Thresholds
•60% disk usage for disks storing the Kafka log (configured via log. dirs)
•60% disk IO usage
•60% network IO usage
•60% file handle usage
Confluent Replicator
Confluent Replicator allows you to easily and reliably replicate topics from one Kafka cluster to another. In addition to copying the messages, Replicator will create topics as needed preserving the topic configuration in the source cluster.
• set the JMX_PORT environment variable
In case the application is in multi-region then replicator metrics and threshold list below can be used.
Replicator metrics MBean:
confluent.replicator:type=confluent-replicator-task-metrics,confluent-replicator-task=([-.w]+),confluent-replicator-task-topic-partition=([-.w]+)
StateFul Operations
For stateful application, the author suggests monitoring using below given metrics. Based on application criticality and Non-functional requirements, thresholds can be set.
Streams — State Store Metrics
1. put-latency-max
The maximum put execution time in ns.
kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
2. put-if-absent-latency-max
The maximum put-if-absent execution time in ns.
kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
3. get-latency-max
The maximum get execution time in ns.
kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
4. delete-latency-max
The maximum delete execution time in ns.
kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
5. restore-latency-max
The maximum restore execution time in ns.
kafka.streams:type=stream-[store-scope]-metrics,client-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
Streams — Record Cache Metrics
hitRatio-avg
The average cache hit ratio defined as the ratio of cache read hits over the total cache read requests.
kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
RocksDb Metrics
number-file-errors-total
The total number of file errors that occurred.
memtable-hit-ratio
The ratio of memtable hits, relative to all lookups to the memtable.
block-cache-filter-hit-ratio
The ratio of block cache hits for filter blocks, relative to all lookups for filter blocks to the block cache.
TOOLS
When it comes to monitoring Kafka Streaming application, there are many tools in the market. The list of prominent tools which can be used for alerting and monitoring of Kafka are as given below.
4. KafDrop
5. Kafka Tool
6. AppDynamics
8. Datadoghq
Each of the above links will provide these available tools’ installation and usage details. I would suggest going through it.