A Java Lagom service which only consumes from Kafka topic (Subscriber only service)
Subscriber only service means an application which only consumes, does not produce.
We have generally seen the applications which both produces and consumes data from a Kafka topic but sometimes we need to write an application which only consumes data i.e. consumes data from a 3rd party service. So in this blog I am going to explain how to write a Lagom service which only consumes data (does not produce any data).
Add below dependency to your impl pom.xml:
[code language=”scala”]
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-kafka-client_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
[/code]
We need to create a separate module kafka-connect in Lagom project. That module will contain a Service Descriptor declaring the topic service will consume from.
[code language=”scala”]
package com.knoldus.kafka;
import com.lightbend.lagom.javadsl.api.Descriptor;
import com.lightbend.lagom.javadsl.api.Service;
import com.lightbend.lagom.javadsl.api.broker.Topic;
import static com.lightbend.lagom.javadsl.api.Service.named;
import static com.lightbend.lagom.javadsl.api.Service.topic;
public interface KafkaService extends Service {
Topic<String> greetingsTopic();
String GREETINGS_TOPIC = “greetings”;
@Override
default Descriptor descriptor() {
return named(“kafkaservice”).withTopics(
topic(GREETINGS_TOPIC, this::greetingsTopic)
).withAutoAcl(true);
}
}
[/code]
Then, add kafka-connect dependency in impl module:
[code language=”scala”]
<dependency>
<artifactId>kafka-connect</artifactId>
<groupId>com.knoldus.lagom-kafka-consumer-only</groupId>
<version>1.0-SNAPSHOT</version>
</dependency>
[/code]
Then, to consume data implement consumer as below:
[code language=”scala”]
@Singleton
public class KafkaConsumer {
private final KafkaService kafkaService;
private ObjectMapper jsonMapper = new ObjectMapper();
@Inject
public KafkaConsumer(KafkaService kafkaService) {
this.kafkaService = kafkaService;
kafkaService.greetingsTopic().subscribe()
.atLeastOnce(Flow.fromFunction(this::displayMessage));
}
private Done displayMessage(String message) {
System.out.println(“Message ::::::::::: “ + message);
try {
GreetingMessage greetingMessage = jsonMapper.readValue(message, GreetingMessage.class);
if (StringUtils.isNotEmpty(greetingMessage.message)) {
System.out.println(“Action performed ::::::::::: “ + message);
// Do your action here
}
} catch (Exception ex) {
System.out.println(“Error in consuming kafka message”);
}
return Done.getInstance();
}
}
[/code]
Finally, you need to bind KafkaConsumer as Eager singleton and KafkaService in HelloModule:
[code language=”scala”]
bind(KafkaConsumer.class).asEagerSingleton();
bindClient(KafkaService.class);
[/code]
To connect with external kafka, made below changes in root pom:
[code language=”scala”]
<plugin>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-maven-plugin</artifactId>
<version>${lagom.version}</version>
<configuration>
<kafkaEnabled>false</kafkaEnabled>
</configuration>
</plugin>
[/code]
and, add below configuration in application.conf file:
[code language=”scala”]
lagom.broker.kafka {
service-name = “”
brokers = “127.0.0.1:9092”
brokers = ${?KAFKA_BROKERS}
}
[/code]
That’s it. Now you need to start zookeper and Kafka at your local machine, create topic greetings and then produce any message in Kafka. As a message will be produced in Kafka, consumer will consume data and will print the message.
I hope you enjoyed the blog.
You can get full code here.