How Client Application Interact with Kafka Cluster — Made Easy with Kafka Java API’s

Shashir
Nerd For Tech
Published in
5 min readFeb 28, 2021

In this article, we’ll see how to make use of Kafka client library(REST Api’s) to interact with Kafka cluster. Let’s start with Part-1 of the 3 part series

Kafka Client API — Kafka logo taken from official Kafka website

Part-1: We’ll see 3 Java Api’s
- AdminClient API
- Producer API
- Consumer API

Part-2: Kafka Streams(In-progress — coming soon)
- Streams API

Part-3: Kafka Connect (In-progress — coming soon)
- Connect API

Setup Kafka Cluster in local/Docker:

Setup Kafka Cluster by creating a topic — “numbers”

Pre-requisites:

- Kafka Cluster (configured and running in local/docker)
- IntelliJ Community (or any IDE of your choice)
- Maven/Gradle (build tool)
- Java 8+

Maven/Gradle dependency:

MAVEN:<!-- For Producer and Consumer API -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>

GRADLE:

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0'

AdminClient API:

The administrative client for kafka, which supports managing and inspecting topics, brokers, configurations, ACL’s. Here we see code samples for

- Creating new topic
- Delete topic
- Describe topic
- List topics
- Fetch controller and cluster nodes details

@Slf4j
@SpringBootApplication
public class KafkaAdminApplication implements CommandLineRunner {

private static final String TOPIC_NAME = "shasr-new-topic";

public static void main(String[] args) {
SpringApplication.run(KafkaAdminApplication.class, args);
}

@Override
public void run(String... args) throws Exception {

//Create Admin Client
AdminClient adminClient = getAdminClient();

// Topic information
deleteTopic(adminClient);
createNewTopic(adminClient);
listTopics(adminClient);
describeTopics(adminClient);

// Cluster information
fetchClusterId(adminClient);
fetchClusterController(adminClient);
fetchClusterNodes(adminClient);
}
}
  • Create AdminClient instance with Kafka configurations
private AdminClient getAdminClient() {
Properties kafkaProperties = new Properties();
kafkaProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:29092");
AdminClient adminClient = AdminClient.create(kafkaProperties);
return adminClient;
}
  • Create new topic and list topics
private void createNewTopic(AdminClient adminClient) throws Exception {

log.info("------------- Create new topic ---------------");
NewTopic createTopic = new NewTopic(TOPIC_NAME, 3, (short) 2);
adminClient.createTopics(Collections.singleton(createTopic));
}

private void listTopics(AdminClient adminClient) throws Exception {

log.info("------------- Topics List ---------------");
Collection<TopicListing> topicList = adminClient.listTopics().listings().get();
topicList.stream().forEach(topic -> {
log.info("Topic Name : {} ", topic.name());
log.info("Is-internal Topic : {} ", topic.isInternal());
});
}

Output:

  • Describe topic details: Leader, Partitions, ISR, Replicas
private void describeTopics(AdminClient adminClient) throws Exception {

log.info("------------- Describe topics ---------------");
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(TOPIC_NAME));
Map<String, TopicDescription> topicDescriptionMap = describeTopicsResult.all().get();

List<TopicPartitionInfo> topicPartitionInfoList = topicDescriptionMap.get(TOPIC_NAME).partitions();
topicPartitionInfoList.stream().forEach(topicInfo -> {
log.info("Leader : {}", topicInfo.leader());
log.info("Partitions : {}", topicInfo.partition());
log.info("Replicas : {}", topicInfo.replicas());
log.info("ISR : {}", topicInfo.isr());
});
}

Output:

  • Delete topic: deleteTopics()
private void deleteTopic(AdminClient adminClient) throws Exception {

Collection<TopicListing> topicList = adminClient.listTopics().listings().get();
log.info("------------- Delete topic ---------------");
if (topicList.contains(TOPIC_NAME)) {
adminClient.deleteTopics(Collections.singleton(TOPIC_NAME));
}
}
  • Access Cluster information: Cluster-Id
private void fetchClusterId(AdminClient adminClient) throws Exception {

log.info("------------- Cluster-Id ---------------");
String clusterId = adminClient.describeCluster().clusterId().get();
log.info("Cluster-Id : {}", clusterId);
}
  • Access Cluster information: Controller details
private void fetchClusterController(AdminClient adminClient) throws Exception {

Node controllerNode = adminClient.describeCluster().controller().get();

log.info("--------------- Controller Node Details ---------------");
log.info("Node-Id: {}", controllerNode.id());
log.info("Node-String: {}", controllerNode.idString());
log.info("Node-host:{}", controllerNode.host());
log.info("Node-Port: {}", controllerNode.port());
log.info("Node-hasRack: {}", controllerNode.hasRack());
}
  • Access Cluster information: All cluster nodes
private void fetchClusterNodes(AdminClient adminClient) throws Exception {

Collection<Node> clusterNodes = adminClient.describeCluster().nodes().get();

clusterNodes.stream().forEach(node -> {
log.info("------------- Node Details ---------------");
log.info("Node-Id: {}", node.id());
log.info("Node-String: {}", node.idString());
log.info("Node-host:{}", node.host());
log.info("Node-Port: {}", node.port());
log.info("Node-hasRack: {}", node.hasRack());
});
}

Output: Cluster details

Producer API:

Producer API helps us to build producers that publishes records to Kafka cluster.

  • bootstrap.servers: kafka broker/server list in the form of hostname/ip with port details to connect.
  • key.serializer: configure serializer class for key (Eg: StringSerializer).
  • value.serializer: configure serializer class for key (Eg: StringSerializer).

Kafka client library has several classes which helps to interact with Kafka cluster. Here we created KafkaProducer instance with producer configuration (broker details, key and value serializers)

In this case, KafkaProducer ‘send’ method accepts ProducerRecord with topic details, key (optional) and value.

Finally we are closing the producer gracefully with producer.close() method.

public void generateNumber() {   Properties kafkaProperties = new Properties();   // Set the brokers (bootstrap servers)
kafkaProperties.setProperty("bootstrap.servers", "localhost:9091");
// Set how to serialize key/value pairs
kafkaProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create Kafka Producer
Producer<String, String> producer = new KafkaProducer<>(kafkaProperties);
for (int i = 100; i < 200; i++) { //Create Producer Record with topic-name, key(optional), value
ProducerRecord producerRecord = new ProducerRecord("numbers", Integer.toString(i));
// Send producer record to kafka
producer.send(producerRecord);
}
producer.close();
}

Consumer API:

Consumer API helps us to builds consumers that reads data from Kafka cluster. Let’s configure Consumer properties to read data from the Kafka cluster as below

  • bootstrap.servers: kafka broker/server list in the form of hostname/ip with port details to connect .
  • group.id: consumer group id of the consumer.
  • key.deserializer: configure deserializer class for key (Eg: StringDeserializer).
  • value.deserializer: configure deserializer class for value (Eg: StringDeserializer).
  • auto.offset.reset: To define the behavior of the consumer when there is no committed position or when an offset is out of range (can take “earliest”, “latest” — default, “none” — handle manually).
private Properties getKafkaProperties() {    // Configure the consumer
Properties kafkaProperties = new Properties();
// Point it to the brokers
kafkaProperties.setProperty("bootstrap.servers", "localhost:9091");
// Set the consumer group (all consumers must belong to a group).
kafkaProperties.setProperty("group.id", "SHASR-NUMBERS");
// Set how to serialize key/value pairs
kafkaProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Read messages from the start.
kafkaProperties.setProperty("auto.offset.reset", "earliest");
return kafkaProperties;
}

subscribe(..): List of Topics to which consumer is subscribed.

poll(Duration.ofMillis(100)): poll method accepts Duration in milliseconds to read messages from the subscribed topic.

close(): To close the Consumer gracefully

public void readData() {    Properties kafkaProperties = getKafkaProperties();

final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Collections.singletonList(kafkaTopic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
// print the offset,key and value for the consumer records.
log.info("RECEIVED_TOPIC : {} ", record.topic());
log.info("RECEIVED_PARTITION_ID : {} ", record.partition());
log.info("OFFSET : {} ", record.offset());
log.info("KEY : {} ", record.key());
log.info("VALUE: {} ", record.value());
log.info("TIME: {}", record.timestamp());
});
}
} finally {
consumer.close();
}
}

Output:

Full codebase : GITHUB

Summary:

I’m glad you made it till the end, hope this Part-1 of the series helped , we went through 3 Kafka client API’s

  • AdminClient API:

The administrative client for kafka, which supports managing and inspecting topics, brokers, configurations, ACL’s. We created new topic, described topic for partition-leader-replica-isr details, delete topic.

  • Producer API:

Producer API helps us to build producers that publishes records to Kafka cluster based on broker and topic details.

  • Consumer API:

Consumer API helps us to builds consumers that reads data from Kafka cluster based on broker, topic and offset-reset details.

Kafka Series:

References:

--

--

Shashir
Nerd For Tech

Middleware Chapter Lead & Java Developer — Java/Python/Spring/Microservices/Kafka/Kubernetes/OCP/Camunda/Splunk/Az Cloud https://www.linkedin.com/in/shashi999