Kafka consumers health check in Spring Boot Actuator

Include consumers and Kafka Streams state and details in the Actuator health endpoint

Mateusz Jadczyk
DNA Technology
4 min readApr 1, 2021

--

Photo by Chris Liverani on Unsplash

As application monitoring is nowadays a crucial part of software maintenance, we decided it would be good to have an easy way to monitor the state of our Kafka consumers in real-time, be notified whenever any of them fails and be able to quickly identify in which service the problem occurred.

This health check focuses on the state of each consumer, not on the connection to the Kafka broker (although any interruptions will most likely impact consumers).

In our setup we’re using standard Kafka consumers and Kafka Streams consumers, wrapped together using Spring Boot. Both types of consumers are configured to go into an error state as soon as an uncaught exception pops up, because we cannot afford changing the order of messages.

Spring Boot Actuator

We don’t like reinventing the wheel, so we extended existing health check endpoint in Spring Boot Actuator.

Spring Boot will autodetect Beans implementing HealthIndicator interface. And so, we implemented one class for consumers and one for Kafka Streams as their API differs and the current status is retrieved in various ways.

You can start by extending AbstractHealthIndicator and implementing the doHealthCheck method, similar to how PingHealthIndicator is implemented already by Spring:

Read more about Actuator and the health endpoint in Spring docs.

Kafka consumers HealthIndicator

Thanks to Spring’s KafkaListenerEndpointRegistry we get access to a Collection of MessageListenerContainer which contains all the details regarding Consumers running in the system. You can iterate this Collection and transform it into a Map of chosen consumer properties.

When implementing a doHealthCheck(Health.Builder builder) method, you can use builder.withDetails(Map) to include status and details of each consumer separately and builder.status(Status) for the overall status of all your Kafka consumers.

Here’s an example of an implementation of the main method, mind that is uses some helper classes to hold the information in a readable way:

As we configured consumers to enter an Error state when they encounter a fatal exception, we’re just checking isRunning flag to compute a status of a particular consumer:

messageListenerContainer.isRunning() ? Status.UP : Status.DOWN

We’ve also included some extra information such as topic-partitions handled by the consumer which give you some insights into how the system operates.

The resulting response for a consumer health check looks like this:

Kafka Streams HealthIndicator

We define all Kafka Streams processors using StreamsBuilderFactoryBean. This allows us to autowire a List of all stream builders into the health check service and retrieve information. Again, the complete logic of digging out the necessary data is omitted, as everyone has their own needs.

Application ID can be fetched in a hacky way directly from StreamsBuilderFactoryBean:

streamsBuilderFactoryBean.getStreamsConfiguration().get("application.id");

For other details, look into KafkaStreams instance itself: streamsBuilderFactoryBean.getKafkaStreams(). Compute the status e.g. based on kafkaStreams.state().isRunningOrRebalancing() and by checking each thread separately:

You can also investigate thread details using kafkaStreams.localThreadsMetadata() to find out which tasks it handles and in turn which topic-partitions.

We found the following structure satisfying:

Usage in monitoring

There are many basic and advanced ways to make use of such data depending on your monitoring solution. You can:

  • track the status code returned by the endpoint which depends on the overall system health (which in turn depends on the health of each component)
  • parse JSON and display a list of failing consumers on TV
  • turn data from HealthIndicators into metrics (Spring Boot uses Micrometer for this) and make them available in Prometheus or Cloudwatch
  • check consumers’ status before and after tests

Kafka Streams learning bonus

When beginning our journey with Kafka Streams, the health endpoint turned out to be a great tool to visualize Streams’ internal details: threads created by each Stream, partitions handled, standby tasks ready on each application instance and so on.

Avoid

We initially implemented a quick-and-dirty solution to hold the state and details of consumers using Maps of Maps. It obviously became quickly unreadable, so we rewrote it using proper shared classes like HealthRow and StatusHealthRow. We created specialised versions of these classes for consumers and Kafka Streams, and with a couple of helper functions to merge and calculate overall statuses. Any changes were much easier from that time on.

Takeaway: don’t waste your time and don’t use generic bags for holding data, go for properly designed classes.

I hope this article helped you gain some insights into how Kafka consumers and Kafka Streams status and details can be exposed for monitoring and alerting in a Spring Boot application using Actuator health endpoint.

Stay tuned for more Kafka-related content!

--

--

Mateusz Jadczyk
DNA Technology

An open-minded full-stack software engineer building products, not just writing code. 👨‍💻 Software Engineer at DNA Technology (dnatechnology.io)