A Java Lagom service which only consumes from Kafka topic (Subscriber only service)

Knoldus Inc.
Knoldus - Technical Insights
2 min readSep 4, 2017

Subscriber only service means an application which only consumes, does not produce.

We have generally seen the applications which both produces and consumes data from a Kafka topic but sometimes we need to write an application which only consumes data i.e. consumes data from a 3rd party service. So in this blog I am going to explain how to write a Lagom service which only consumes data (does not produce any data).

Add below dependency to your impl pom.xml:

[code language=”scala”]
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-kafka-client_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
[/code]

We need to create a separate module kafka-connect in Lagom project. That module will contain a Service Descriptor declaring the topic service will consume from.

[code language=”scala”]
package com.knoldus.kafka;

import com.lightbend.lagom.javadsl.api.Descriptor;
import com.lightbend.lagom.javadsl.api.Service;
import com.lightbend.lagom.javadsl.api.broker.Topic;

import static com.lightbend.lagom.javadsl.api.Service.named;
import static com.lightbend.lagom.javadsl.api.Service.topic;

public interface KafkaService extends Service {

Topic<String> greetingsTopic();

String GREETINGS_TOPIC = “greetings”;

@Override
default Descriptor descriptor() {
return named(“kafkaservice”).withTopics(
topic(GREETINGS_TOPIC, this::greetingsTopic)
).withAutoAcl(true);
}
}
[/code]

Then, add kafka-connect dependency in impl module:

[code language=”scala”]
<dependency>
<artifactId>kafka-connect</artifactId>
<groupId>com.knoldus.lagom-kafka-consumer-only</groupId>
<version>1.0-SNAPSHOT</version>
</dependency>
[/code]

Then, to consume data implement consumer as below:

[code language=”scala”]
@Singleton
public class KafkaConsumer {

private final KafkaService kafkaService;
private ObjectMapper jsonMapper = new ObjectMapper();

@Inject
public KafkaConsumer(KafkaService kafkaService) {
this.kafkaService = kafkaService;
kafkaService.greetingsTopic().subscribe()
.atLeastOnce(Flow.fromFunction(this::displayMessage));
}

private Done displayMessage(String message) {
System.out.println(“Message ::::::::::: “ + message);
try {
GreetingMessage greetingMessage = jsonMapper.readValue(message, GreetingMessage.class);
if (StringUtils.isNotEmpty(greetingMessage.message)) {
System.out.println(“Action performed ::::::::::: “ + message);

// Do your action here
}
} catch (Exception ex) {
System.out.println(“Error in consuming kafka message”);
}
return Done.getInstance();
}
}
[/code]

Finally, you need to bind KafkaConsumer as Eager singleton and KafkaService in HelloModule:

[code language=”scala”]
bind(KafkaConsumer.class).asEagerSingleton();
bindClient(KafkaService.class);
[/code]

To connect with external kafka, made below changes in root pom:

[code language=”scala”]
<plugin>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-maven-plugin</artifactId>
<version>${lagom.version}</version>
<configuration>
<kafkaEnabled>false</kafkaEnabled>
</configuration>
</plugin>
[/code]

and, add below configuration in application.conf file:

[code language=”scala”]
lagom.broker.kafka {
service-name = “”

brokers = “127.0.0.1:9092”
brokers = ${?KAFKA_BROKERS}
}
[/code]

That’s it. Now you need to start zookeper and Kafka at your local machine, create topic greetings and then produce any message in Kafka. As a message will be produced in Kafka, consumer will consume data and will print the message.

I hope you enjoyed the blog.

You can get full code here.

--

--

Knoldus Inc.
Knoldus - Technical Insights

Group of smart Engineers with a Product mindset who partner with your business to drive competitive advantage | www.knoldus.com