Hello World with a basic Kafka Producer and Consumer

Hello World with a basic Kafka Producer and Consumer. It is assumed that you know Kafka terminology. There is a lot to learn about Kafka, but this starter is as simple as it can get with Zookeeper, Kafka and Java based producer/consumer.

Install Zookeeper and run it in standalone mode. No Zookeeper ensemble for this exercise. Install in this case is just unzip.

Go to your Zookeeper home directory and start zookeeper
bin/zkServer.sh start

Verify that zookeeper is running
telnet localhost 2181

Download and install Kafka 1.0. Install in this case is just unzip.

Go to your Kafka home directory and start Kafka
./bin/kafka-server-start.sh -daemon config/server.properties

Create a topic named test
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Put messages into the test topic (type in message and hit enter to publish)
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Consume messages from the test topic (ctrl+c to quit)
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

And now the Java code to publish and subscribe. I used Eclipse STS and you can run the producer and consumer from the IDE. In the producer console type in a string and hit enter. The text will be published to a test topic. The consumer will continue to run forever and pick up any messages published and print to console. Type in “exit” to stop the publisher (or you can kill it). Consumer can be killed when you are done testing.

package kafka;

import java.util.Properties;
import java.util.Scanner;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

* Kafka hello world message producer
public class KafkaProducerExample {
private final static String TOPIC_NAME = "topic_name";

public static void main(String[] args) {
// produce a test message
// if u run this multiple times ... u will have multiple messages in the
// test_topic topic (as would be expected)
Producer<String, String> producer = KafkaProducerExample.createProducer();

Scanner sc = new Scanner(System.in);
try {
while (true) {
System.out.print("> ");
String text = sc.nextLine();

ProducerRecord<String, String> recordToSend = new ProducerRecord<String, String>(TOPIC_NAME, "message",
text + " , timeInMillis=" + System.currentTimeMillis());
try {
// synchronous send.... get() waits for the computation to
// finish
RecordMetadata rmd = producer.send(recordToSend).get();
System.out.printf("Message Sent ==>> topic = %s, partition = %s, offset = %d\n", rmd.topic(),
rmd.partition(), rmd.offset());
} catch (Exception ex) {
// this is test code...so don't judge me !!

if (text.equalsIgnoreCase("exit")) {
} finally {

private static Producer<String, String> createProducer() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<String, String>(kafkaProps);

package kafka;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

* Kafka hello world consumer
public class KafkaConsumerExample {
private final static String TOPIC_NAME = "topic_name";

public static void main(String[] args) {
// consume messages
Consumer<String, String> consumer = KafkaConsumerExample.createConsumer();

// subscribe to the test topic
try {
// loop forever (hmmmmm)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// print the messages received
for (ConsumerRecord<String, String> record : records) {
"Message received ==> topic = %s, partition = %s, offset = %d, key = %s, value = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());

} finally {

private static Consumer<String, String> createConsumer() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("group.id", "test_consumer_group");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(kafkaProps);
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">