Change Spring Boot Kafka Consumer State at Runtime

Faza Zulfika Permana Putra
Blibli.com Tech Blog
10 min readAug 19, 2021

After several time developing feature with Apache Kafka, sometimes i wonder “Can I stop a Kafka consumer without restarting my application ?”. If we can stop my Kafka consumer at runtime, resources used by this Kafka consumer for processing messages can be utilized by other features that also needs resources. When needed, we can start my kafka consumer again if another feature no need more resources. So from that question that comes to my head, i start trying to make it happen and wrote this article.

Apache Kafka

When we want to develop an application that will processing a constant stream of incoming data, Apache Kafka is a platform that can help send data streams to applications that will process the data. In simple terms, the way apache kafka works is to send messages, store these messages in a queue, and retrieve messages from queue to process it. To understand more about apache kafka, you can read the documentation from apache kafka.

Spring Boot

Currently, there are many programming languages and frameworks that support Apache Kafka, one of which is spring boot. Spring boot is part of the Spring framework which makes it easy to build applications that are ready to run in a production environment. To understand more about the spring boot, you can read the documentation for the spring boot.

A Simple Kafka Consumer

As i mentioned before, in this article, I will discuss how to change spring boot Kafka consumer state while application is running. To be able to make kafka consumers, a spring-kafka library is needed. This example also uses the Lombok library to help simplify code.

pom.xml....
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
....
<dependencies>
....
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
....
</dependencies>
....

In simple terms, to create a kafka consumer with spring boot, you can use the @KafkaListener annotation. When using the @KafkaListener annotation, a topics parameter is required. The topics parameter will determine from which queue data will be retrieved and processed. The following is an example of its implementation in program code.

MessageListener.java

@Slf4j
@Component
public class MessageListener {

    @KafkaListener(topics = "com.faza.example.kafka.topic")
public void kafkaMessageListener(ConsumerRecord<String, String> record) {
log.info("Kafka message listener got a new record: " + record);
CompletableFuture.runAsync(this::sleep)
.join();
log.info("Kafka message listener done processing record: " + record);
}
@SneakyThrows
private void sleep() {
Thread.sleep(5000);
}
}

After using the @KafkaListener annotation, we need to set @EnableKafka annotation in our configuration class and set some few properties, The properties will be used to determine where the apache kafka is located, and the group name of the consumer we created.

Application.java@EnableKafka
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}--------------------------------------------------------------------application.propertiesspring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=runtime-kafka-registry

When running the application, by default the kafka consumer that we created will be activated immediately. Then how to stop the Kafka consumer that has been made ? Do we have to stop the application too ? The answer is by using the autoStartup parameter. By default the value of autoStartup is true, so that Kafka consumers will automatically active when the application is run. We can set the value of autoStartup based on the value we set in properties, so we don’t have to change code of the application. However, when we change the value of the properties, we still have to restart the application. So Kafka consumers state will match with what we set in the properties.

MessageListener.java....
@KafkaListener(topics = "com.faza.example.kafka.topic",
autoStartup = "${spring.kafka.consumer.topic.activation-status}")
public void kafkaMessageListener(ConsumerRecord<String, String> record) {
....
--------------------------------------------------------------------application.propertiesspring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=runtime-kafka-registry
spring.kafka.consumer.topic.activation-status=false

Getting Kafka Consumers Information on the Application

When we create a kafka consumer using the @KafkaListener annotation, it will be read by the spring when the application is run. KafkaListenerAnnotationBeanPostProcessor is class that responsible to read those annotation. Furthermore, the kafka consumers that we have created will be registered in the KafkaListenerEndpointRegistry class by KafkaListenerEndpointRegistrar class. When the application is running, KafkaListenerEndpointRegistry will starting the consumers that have been registered based on the autoStartup property.

Based on the explanation above, it can be seen that our Kafka consumers information when the application is running is saved in the KafkaListenerEndpointRegistry class. So that we can find out information related to our Kafka consumers through KafkaListenerEndpointRegistry while the application is running. We can create rest api that returns that information, for that we need the spring-boot-starter-web library.

pom.xml....
<dependencies>
....
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
....
</dependencies>
....
--------------------------------------------------------------------KafkaConsumerAssignmentResponse.java@Data
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class KafkaConsumerAssignmentResponse {
private String topic; private Integer partition;
}
--------------------------------------------------------------------KafkaConsumerResponse.java@Data
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class KafkaConsumerResponse {
private String consumerId;
private String groupId;
private String listenerId;
private Boolean active; private List<KafkaConsumerAssignmentResponse> assignments;
}
--------------------------------------------------------------------KafkaConsumerRegistryController.java@Slf4j
@RestController
@RequestMapping(path = "/api/kafka/registry")
public class KafkaConsumerRegistryController {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@GetMapping
public List<KafkaConsumerResponse> getConsumerIds() {
return kafkaListenerEndpointRegistry.getListenerContainerIds()
.stream()
.map(this::createKafkaConsumerResponse)
.collect(Collectors.toList());
}
private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {
MessageListenerContainer listenerContainer =
kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
return KafkaConsumerResponse.builder()
.consumerId(consumerId)
.groupId(listenerContainer.getGroupId())
.listenerId(listenerContainer.getListenerId())
.active(listenerContainer.isRunning())
.assignments(Optional.ofNullable(listenerContainer.getAssignedPartitions())
.map(topicPartitions -> topicPartitions.stream()
.map(this::createKafkaConsumerAssignmentResponse)
.collect(Collectors.toList()))
.orElse(null))
.build();
}
private KafkaConsumerAssignmentResponse createKafkaConsumerAssignmentResponse(
TopicPartition topicPartition) {
return KafkaConsumerAssignmentResponse.builder()
.topic(topicPartition.topic())
.partition(topicPartition.partition())
.build();
}
}

Then run the application, and try to access GET http://localhost:8080/api/kafka/registry. You will get a response regarding the kafka consumers information that has been made.

[
{
consumerId: "org.springframework.kafka.KafkaListenerEndpointContainer#0",
groupId: "runtime-kafka-registry",
listenerId: "org.springframework.kafka.KafkaListenerEndpointContainer#0",
active: true,
assignments: [
{
topic: "com.faza.example.kafka.topic",
partition: 0
}
]
}
]

Each consumer that we made will have a unique id, which is used to identify each consumer. We will use this consumer id later to change consumers state. By default the consumer id will look like in the response above. However we can change it using the id parameter in @KafkaListener annotation. When using the id parameter, we must change the value of the idIsGroup parameter to false, so that the id we set doesn’t become the id of our consumer group.

MessageListener.java....
@KafkaListener(topics = "com.faza.example.kafka.topic", id = "my-message-consumer", idIsGroup = false,
autoStartup = "${spring.kafka.consumer.topic.activation-status}")
public void kafkaMessageListener(ConsumerRecord<String, String> record) {
....
--------------------------------------------------------------------[
{
consumerId: "my-message-consumer",
groupId: "runtime-kafka-registry",
listenerId: "my-message-consumer",
active: true,
assignments: [
{
topic: "com.faza.example.kafka.topic",
partition: 0
}
]
}
]

Stopping a Kafka Consumer

We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent.

KafkaConsumerRegistryController.java....
@PostMapping(path = "/deactivate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void deactivateConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is already stop", consumerId));
} else {
log.info("Stopping a consumer with id " + consumerId);
listenerContainer.stop();
}
}
....

Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.

note: when we stop a consumer, Kafka will conduct a rebalance process.

Running a Kafka Consumer

We can also use rest api to restart a stopped consumer. Just like rest api for stopping consumer, rest api for running consumer also requires id from consumer.

KafkaConsumerRegistryController.java....
@PostMapping(path = "/activate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void activateConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is already running", consumerId));
} else {
log.info("Running a consumer with id " + consumerId);
listenerContainer.start();
}
}
....

Then try to access the POST http://localhost:8080/api/kafka/registry/activate by sending the id parameter of the consumer you want to run.

note: when we stop a consumer, Kafka will conduct a rebalance process.

Pausing a Kafka Consumer

In above section, we already created a rest api to stop the consumer. Then what is the difference between stopping the consumer and pausing the consumer? When we stop the consumer, the consumer will be considered dead by Kafka, and Kafka will conduct a rebalance process. But when we pause a consumer, the consumer doesn’t die. The consumer is still alive, but has stopped the process of retrieving data from Kafka. When pausing a consumer, Kafka will not conduct a rebalance process.

KafkaConsumerRegistryController.java....
@PostMapping(path = "/pause")
@ResponseStatus(HttpStatus.ACCEPTED)
public void pauseConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
} else if (listenerContainer.isContainerPaused()) {
throw new RuntimeException(String.format("Consumer with id %s is already paused", consumerId));
} else if (listenerContainer.isPauseRequested()) {
throw new RuntimeException(String.format("Consumer with id %s is already requested to be paused", consumerId));
} else {
log.info("Pausing a consumer with id " + consumerId);
listenerContainer.pause();
}
}
....

Then try to access the POST http://localhost:8080/api/kafka/registry/pause by sending the id parameter of the consumer you want to pause.

Resuming a Paused Kafka Consumer

To resume a paused consumer, you can also use the rest api. When resuming a paused consumer, the consumer will continue retrieve data from Kafka, and Kafka will not conduct a rebalance process.

KafkaConsumerRegistryController.java....
@PostMapping(path = "/resume")
@ResponseStatus(HttpStatus.ACCEPTED)
public void resumeConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
} else if (!listenerContainer.isContainerPaused()) {
throw new RuntimeException(String.format("Consumer with id %s is not paused", consumerId));
} else {
log.info("Resuming a consumer with id " + consumerId);
listenerContainer.resume();
}
}
....

Then try to access the POST http://localhost:8080/api/kafka/registry/resume by sending the id parameter of the consumer you want to resume.

Multi-VM Applications

To overcome the high load on the application, we can increase the number of application resources (vertical scale), or increase the number of running applications (horizontal scale). As the number of applications increases, that means to change the kafka consumer state for all applications, we need to access the rest api for all running applications. But instead of having to access the rest api for all applications, we can use kafka to send the desired consumer state to all applications at once..

In simple terms, we only need to access the rest api on one application, then the application will send a message to Kafka which will be retrieved by all of our applications. So that our application can process these messages, and adjust the state for Kafka consumers according to the messages they receive.

Publish Command to Kafka

We will slightly modify the rest api that we created. Instead of directly processing the command that sent in controller, it will send the command in the form of a message to kafka. So that each of our applications can retrieve the message, and set the desired consumer state according to the message received.

ConsumerAction.javapublic enum ConsumerAction {    ACTIVATE,
DEACTIVATE,
PAUSE,
RESUME
}
--------------------------------------------------------------------ConsumerActionRequest.java@Data
@Builder
public class ConsumerActionRequest {
@Builder.Default
private long timestamp = System.currentTimeMillis();
private String consumerId; private ConsumerAction consumerAction;
}
--------------------------------------------------------------------Constant.javapublic class Constant { public static final String CONSUMER_ACTION_TOPIC = "com.faza.example.kafka.topic.action";
}
--------------------------------------------------------------------KafkaConsumerRegistryController.java@Slf4j
@RestController
@RequestMapping(path = "/api/kafka/registry")
public class KafkaConsumerRegistryController {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping
public List<KafkaConsumerResponse> getConsumerIds() {
return kafkaListenerEndpointRegistry.getListenerContainerIds()
.stream()
.map(this::createKafkaConsumerResponse)
.collect(Collectors.toList());
}
@PostMapping(path = "/activate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void activateConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is already running", consumerId));
} else {
publishMessage(ConsumerActionRequest.builder()
.consumerId(consumerId)
.consumerAction(ConsumerAction.ACTIVATE)
.build());
}
}
@PostMapping(path = "/pause")
@ResponseStatus(HttpStatus.ACCEPTED)
public void pauseConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
} else if (listenerContainer.isContainerPaused()) {
throw new RuntimeException(String.format("Consumer with id %s is already paused", consumerId));
} else if (listenerContainer.isPauseRequested()) {
throw new RuntimeException(String.format("Consumer with id %s is already requested to be paused", consumerId));
} else {
publishMessage(ConsumerActionRequest.builder()
.consumerId(consumerId)
.consumerAction(ConsumerAction.PAUSE)
.build());
}
}
@PostMapping(path = "/resume")
@ResponseStatus(HttpStatus.ACCEPTED)
public void resumeConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is not running", consumerId));
} else if (!listenerContainer.isContainerPaused()) {
throw new RuntimeException(String.format("Consumer with id %s is not paused", consumerId));
} else {
publishMessage(ConsumerActionRequest.builder()
.consumerId(consumerId)
.consumerAction(ConsumerAction.RESUME)
.build());
}
}
@PostMapping(path = "/deactivate")
@ResponseStatus(HttpStatus.ACCEPTED)
public void deactivateConsumer(@RequestParam String consumerId) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
if (Objects.isNull(listenerContainer)) {
throw new RuntimeException(String.format("Consumer with id %s is not found", consumerId));
} else if (!listenerContainer.isRunning()) {
throw new RuntimeException(String.format("Consumer with id %s is already stop", consumerId));
} else {
publishMessage(ConsumerActionRequest.builder()
.consumerId(consumerId)
.consumerAction(ConsumerAction.DEACTIVATE)
.build());
}
}
private KafkaConsumerResponse createKafkaConsumerResponse(String consumerId) {
MessageListenerContainer listenerContainer =
kafkaListenerEndpointRegistry.getListenerContainer(consumerId);
return KafkaConsumerResponse.builder()
.consumerId(consumerId)
.groupId(listenerContainer.getGroupId())
.listenerId(listenerContainer.getListenerId())
.active(listenerContainer.isRunning())
.assignments(Optional.ofNullable(listenerContainer.getAssignedPartitions())
.map(topicPartitions -> topicPartitions.stream()
.map(this::createKafkaConsumerAssignmentResponse)
.collect(Collectors.toList()))
.orElse(null))
.build();
}
private KafkaConsumerAssignmentResponse createKafkaConsumerAssignmentResponse(
TopicPartition topicPartition) {
return KafkaConsumerAssignmentResponse.builder()
.topic(topicPartition.topic())
.partition(topicPartition.partition())
.build();
}
@SneakyThrows
private void publishMessage(Object message) {
log.info(String.format("Publishing message %s to %s", message, Constant.CONSUMER_ACTION_TOPIC));
kafkaTemplate.send(Constant.CONSUMER_ACTION_TOPIC, objectMapper.writeValueAsString(message));
}
}

Consuming Our Command

After rest api sends a message to Kafka, we need to retrieve the message. The @KafkaListener annotation is still used to create kafka consumers, but created kafka consumers will have a unique id group for each application. If Kafka consumers are in the same group, then the same message will not be received by all Kafka consumers, only one consumer will receive the message, so a different group id is required, so that all consumers can receive the message.

ConsumerActionListener.java@Slf4j
@Component
public class ConsumerActionListener {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private ObjectMapper objectMapper;
@SneakyThrows
@KafkaListener(topics = Constant.CONSUMER_ACTION_TOPIC,
id = "my-message-consumer-#{T(java.util.UUID).randomUUID().toString()}")
public void consumerActionListener(ConsumerRecord<String, String> record) {
log.info("Consumer action listener got a new record: " + record);
ConsumerActionRequest consumerActionRequest = objectMapper.readValue(record.value(),
ConsumerActionRequest.class);
processAction(consumerActionRequest);
log.info("Consumer action listener done processing record: " + record);
}
private void processAction(ConsumerActionRequest request) {
String consumerId = request.getConsumerId();
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(
consumerId);
switch (request.getConsumerAction()) {
case ACTIVATE:
log.info("Running a consumer with id " + consumerId);
listenerContainer.start();
break;
case PAUSE:
log.info("Pausing a consumer with id " + consumerId);
listenerContainer.pause();
break;
case RESUME:
log.info("Resuming a consumer with id " + consumerId);
listenerContainer.resume();
break;
case DEACTIVATE:
log.info("Stopping a consumer with id " + consumerId);
listenerContainer.stop();
break;
default:
log.warn("Consumer action listener do not know action: " +
request.getConsumerAction());
}
}
}

Conclusion

All information about our kafka consumers is saved by the KafkaListenerEndpointRegistry class, so that we can change the state of our kafka consumers while the application is running as long as we know the id of our kafka consumer. All sample code used in this article can be found on the following github repository.

Next article will talk about how to make a dynamic spring boot kafka consumer.

--

--