Dynamic Spring Boot Kafka Consumer

Faza Zulfika Permana Putra
Blibli.com Tech Blog
7 min readAug 19, 2021

In the previous article, i wrote about how to change kafka consumer state at runtime. Then i started to think, what if I have a logic process that I want to apply for some kafka topics? Do I have to create multiple kafka consumers using the @KafkaListener annotation?

If you have tried to create a kafka consumer using a spring boot or have read my previous article, the answer of my question above is to create multiple consumer classes using the @KafkaListener annotation. However, every time we create a new consumer, we have to change the application code and run the application again, so that the consumer we just created can run.

In this article, I will try to use the KafkaListenerEndpointRegistry class to register the kafka consumer when the application is startup or when it is running. That way we don’t need to create multiple classes for each kafka consumer we want to create, and there’s no need to change the code and restart our application when we want to create a new kafka consumer.

Create a Dynamic Kafka Consumer

If you take a deeper look at the KafkaListenerEndpointRegistry class, you will find a method for registering your consumer, which is the registerListenerContainer method. The method accepts 3 parameters, that is KafkaListenerEndpoint, KafkaListenerContainerFactory, and Boolean startImmediately. Knowing that, the first thing we need to create is a KafkaListenerEndpoint object which we will list it into.
KafkaListenerEndpointRegistry

The KafkaListenerEndpoint class is a class that stores information to define a kafka consumer, including information regarding the consumer id, the listened topics, the consumer group id, the consumer class, the methods that used to process messages, and so on. Because KafkaListenerEndpoint is an interface, we can use one of its implementation classes, one of them is the MethodKafkaListenerEndpoint class. This MethodKafkaListenerEndpoint class is also used to define kafka consumers when we use the @KafkaListener annotation.

Because there are several things that we can define for all of our consumers later, I will create a parent class that is responsible for creating a template object from the MethodKafkaListenerEndpoint class. We will extend the parent class with our consumer class, and add some specific information to our consumer in the MethodKafkaListenerEndpoint object that has been created.

CustomMessageListener.javapublic abstract class CustomMessageListener {    private static int NUMBER_OF_LISTENERS = 0;    @Autowired
private KafkaProperties kafkaProperties;
public abstract KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic); protected MethodKafkaListenerEndpoint<String, String> createDefaultMethodKafkaListenerEndpoint(
String name, String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(getConsumerId(name));
kafkaListenerEndpoint.setGroupId(kafkaProperties.getConsumer().getGroupId());
kafkaListenerEndpoint.setAutoStartup(true);
kafkaListenerEndpoint.setTopics(topic);
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
return kafkaListenerEndpoint;
}
private String getConsumerId(String name) {
if (isBlank(name)) {
return CustomMessageListener.class.getCanonicalName() + "#" + NUMBER_OF_LISTENERS++;
} else {
return name;
}
}
private boolean isBlank(String string) {
return Optional.ofNullable(string)
.map(String::isBlank)
.orElse(true);
}
}
--------------------------------------------------------------------MyCustomMessageListener.java@Component
public class MyCustomMessageListener extends CustomMessageListener {
@Override
@SneakyThrows
public KafkaListenerEndpoint createKafkaListenerEndpoint(String name, String topic) {
MethodKafkaListenerEndpoint<String, String> kafkaListenerEndpoint =
createDefaultMethodKafkaListenerEndpoint(name, topic);
kafkaListenerEndpoint.setBean(new MyMessageListener());
kafkaListenerEndpoint.setMethod(MyMessageListener.class.getMethod(
"onMessage", ConsumerRecord.class));
return kafkaListenerEndpoint;
}
@Slf4j
private static class MyMessageListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
log.info("My message listener got a new record: " + record);
CompletableFuture.runAsync(this::sleep)
.join();
log.info("My message listener done processing record: " + record);
}
@SneakyThrows
private void sleep() {
Thread.sleep(5000);
}
}
}

Register Kafka Consumer at Runtime

To be able to create a kafka consumer, based on our dynamic kafka consumer code, we need information regarding the name of the consumer and the topic that the consumer will support. Information regarding the consumer name and topic will be retrieved via the spring boot properties. However, later you will be able to change the source of the information, through databases, caches, files, restarts, and so on.

CustomKafkaListenerProperty.java@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CustomKafkaListenerProperty {
private String topic;
private String listenerClass;
}
--------------------------------------------------------------------CustomKafkaListenerProperties.java@Data
@ConfigurationProperties(prefix = "custom.kafka")
public class CustomKafkaListenerProperties {
private Map<String, CustomKafkaListenerProperty> listeners;
}

In the 2 codes above, we define a class that will automatically read properties with the prefix “custom.kafka”. Since we are defining a variable listeners, properties with the prefix “custom.kafka.listeners” will be stored in that variable. In order to use these classes, an @ConfigurationPropertiesScan annotation is required, and dont forget @EnableKafka annotation.

Application.java@EnableKafka
@SpringBootApplication
@ConfigurationPropertiesScan
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}--------------------------------------------------------------------application.propertiesspring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=runtime-kafka-registry
custom.kafka.listeners.listener-1.topic=com.faza.example.custom.listener.property
custom.kafka.listeners.listener-1.listener-class=MyCustomMessageListener

Our kafka consumer properties can be set using “custom.kafka.listener. <key> .topic” and “custom.kafka.listener. <key> .listener-class”. The key will define the id of our consumer, topic will define the topic that will listened by our consumers, and listener-class is the class that will process messages from the topic we set. Then at application startup we will registering all our consumer that we already defined before with our custom registrar. It should be noted that the listener that we can use are classes that are inside “com.faza.example.dynamickafkaconsumer.listener” package.

CustomKafkaListenerRegistrar.java@Component
public class CustomKafkaListenerRegistrar implements InitializingBean {
@Autowired
private CustomKafkaListenerProperties customKafkaListenerProperties;
@Autowired
private BeanFactory beanFactory;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
@Override
public void afterPropertiesSet() {
customKafkaListenerProperties.getListeners()
.forEach(this::registerCustomKafkaListener);
}
public void registerCustomKafkaListener(String name, CustomKafkaListenerProperty customKafkaListenerProperty) {
this.registerCustomKafkaListener(name, customKafkaListenerProperty, false);
}
@SneakyThrows
public void registerCustomKafkaListener(String name, CustomKafkaListenerProperty customKafkaListenerProperty,
boolean startImmediately) {
String listenerClass = String.join(".", CustomKafkaListenerRegistrar.class.getPackageName(),
customKafkaListenerProperty.getListenerClass());
CustomMessageListener customMessageListener =
(CustomMessageListener) beanFactory.getBean(Class.forName(listenerClass));
kafkaListenerEndpointRegistry.registerListenerContainer(
customMessageListener.createKafkaListenerEndpoint(name, customKafkaListenerProperty.getTopic()),
kafkaListenerContainerFactory, startImmediately);
}
}

Let’s try running the application, you will be able to see through the logs that our customer has been created with the id that we have set to <key> in properties. In addition to logs, you can access our consumer information through rest api based on the article I wrote before.

2020-11-26 14:13:24.003  INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] (Re-)joining group
2020-11-26 14:13:24.080 INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] Finished assignment for group at generation 5: {consumer-runtime-kafka-registry-1-4e2cb25a-b72e-450a-ba1c-de9e4becbfc8=Assignment(partitions=[com.faza.example.custom.listener.property-0])}
2020-11-26 14:13:24.090 INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] Successfully joined group with generation 5
2020-11-26 14:13:24.092 INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] Notifying assignor about the new Assignment(partitions=[com.faza.example.custom.listener.property-0])
2020-11-26 14:13:24.097 INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] Adding newly assigned partitions: com.faza.example.custom.listener.property-0
2020-11-26 14:13:24.114 INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] Found no committed offset for partition com.faza.example.custom.listener.property-0
2020-11-26 14:13:24.128 INFO 22200 --- [istener-1-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-runtime-kafka-registry-1, groupId=runtime-kafka-registry] Resetting offset for partition com.faza.example.custom.listener.property-0 to offset 0.
2020-11-26 14:13:24.130 INFO 22200 --- [istener-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : runtime-kafka-registry: partitions assigned: [com.faza.example.custom.listener.property-0]
--------------------------------------------------------------------[
{
"consumerId": "listener-1",
"groupId": "runtime-kafka-registry",
"listenerId": "listener-1",
"active": true,
"assignments": [
{
"topic": "com.faza.example.custom.listener.property",
"partition": 0
}
]
}
]

Using REST-API to Registering Our New Consumer

In the previous section we created a dynamic consumer based on the data in the properties, this time we will try to register a new consumer using rest api. We’ll add a few lines of code to the KafkaConsumerRegistryController class to create a rest api that can register new consumers.

KafkaConsumerRegistryController.java....
@Autowired
private CustomKafkaListenerRegistrar customKafkaListenerRegistrar;
....
....
@PostMapping(path = "/create")
@ResponseStatus(HttpStatus.CREATED)
public void createConsumer(@RequestParam String topic, @RequestParam String listenerClass,
@RequestParam(required = false) boolean startImmediately) {
customKafkaListenerRegistrar.registerCustomKafkaListener(null,
CustomKafkaListenerProperty.builder()
.topic(topic)
.listenerClass(listenerClass)
.build(),
startImmediately);
}
....

Let’s try restarting the application, then hit the POST http://localhost:8080/api/kafka/registry/create by passing the topic and listenerClass parameters. The parameters that we send have the same definition with custom kafka properties that we set on properties. After that, try to hit GET http://localhost:8080/api/kafka/registry, and you will see the consumer you just registered.

[
{
"consumerId": "com.faza.example.dynamickafkaconsumer.listener.CustomMessageListener#0",
"groupId": "runtime-kafka-registry",
"listenerId": "com.faza.example.dynamickafkaconsumer.listener.CustomMessageListener#0",
"active": false,
"assignments": []
},
{
"consumerId": "listener-1",
"groupId": "runtime-kafka-registry",
"listenerId": "listener-1",
"active": true,
"assignments": [
{
"topic": "com.faza.example.custom.listener.property",
"partition": 0
}
]
}
]

However, the consumers that we have registered are not yet running, why aren’t they running? because previously we didn’t pass the startImmediately parameter, so the consumer doesn’t running immediately after it registered. You can add the parameter startImmediately true when registering consumers, or use http://localhost:8080/api/kafka/registry/activate to make your consumers running.

[
{
"consumerId": "com.faza.example.dynamickafkaconsumer.listener.CustomMessageListener#0",
"groupId": "runtime-kafka-registry",
"listenerId": "com.faza.example.dynamickafkaconsumer.listener.CustomMessageListener#0",
"active": true,
"assignments": [
{
"topic": "com.faza.example.custom.listener.property.new",
"partition": 0
}
]
},
{
"consumerId": "listener-1",
"groupId": "runtime-kafka-registry",
"listenerId": "listener-1",
"active": true,
"assignments": [
{
"topic": "com.faza.example.custom.listener.property",
"partition": 0
}
]
}
]

You can also change the state of your kafka consumers using the api rest we created earlier in the KafkaConsumerRegistryController.

Multi-VM Applications

Sometimes we have running applications in multiple vms, so we have many instance that will processing data. Therefore we need to create a consumer that will retrived action message, we just modify our action consumer from my previous article. We will make our action consumer support to create new kafka consumer, and modify our create rest api to publish action message.

ConsumerAction.javapublic enum ConsumerAction {    CREATE,
ACTIVATE,
DEACTIVATE,
PAUSE,
RESUME
}
--------------------------------------------------------------------ConsumerActionRequest.java@Data
@Builder
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ConsumerActionRequest {
@Builder.Default
private long timestamp = System.currentTimeMillis();
private String consumerId; private CustomKafkaListenerProperty consumerProperty; private Boolean startImmediately; private ConsumerAction consumerAction;
}
--------------------------------------------------------------------ConsumerActionListener.java....
@Autowired
private CustomKafkaListenerRegistrar customKafkaListenerRegistrar;
....
....
private void processAction(ConsumerActionRequest request) {
String consumerId = request.getConsumerId();
MessageListenerContainer listenerContainer = Optional.ofNullable(consumerId)
.map(kafkaListenerEndpointRegistry::getListenerContainer)
.orElse(null);
switch (request.getConsumerAction()) {
case CREATE:
CustomKafkaListenerProperty consumerProperty = request.getConsumerProperty();
log.info(String.format("Creating a %s consumer for topic %s",
consumerProperty.getListenerClass(), consumerProperty.getTopic()));
customKafkaListenerRegistrar.registerCustomKafkaListener(null,
consumerProperty, request.getStartImmediately());
break;
....
}
....
--------------------------------------------------------------------KafkaConsumerRegistryController.java....
@PostMapping(path = "/create")
@ResponseStatus(HttpStatus.CREATED)
public void createConsumer(@RequestParam String topic, @RequestParam String listenerClass,
@RequestParam(required = false) boolean startImmediately) {
publishMessage(ConsumerActionRequest.builder()
.consumerProperty(CustomKafkaListenerProperty.builder()
.topic(topic)
.listenerClass(listenerClass)
.build())
.startImmediately(startImmediately)
.consumerAction(ConsumerAction.CREATE)
.build());
}
....

Conclusion

From now on, if we want to use the same consumer logic for multiple topics, we no longer need to define consumer classes for each topic. We simply define a consumer class, and register it for each topic we want.

In this article the source of our topic and consumer information is application properties, but you can change the source of that information. You can store this information in a database table or collection, and register new consumers when adding new data to the database. Apart from databases, you can use other sources of information such as cache, files, or even rest api from other applications.

The limitation in this article is that we cannot delete the consumers we have registered, because it is not possible to do this using the KafkaListenerEndpointRegistry class. In order to make this possible, we need to create our own registry class. I will discuss this in the next article.

All the code that we have done in this article can be seen in the following github repository.

--

--