Kafka — Getting Started

Shinemon Divakaran
5 min readMay 24, 2020

--

Photo by Veronika Koroleva on Unsplash

In this post, we will attempt to set up Apache Kafka and establish a Producer to the Topic, and the consumer to subscribe to the Topic.

Apache Kafka is a distributed streaming platform, that can publish and subscribe stream of records which is similar to a message queue. Basically Kafka stores those records based on retention policy/available space.

Kafka stores these stream of records in topics, and each record consist of a key, a value and a timestamp.

Main API of Kafka are Producer, Consumer, Stream, Connector and Admin. We will primarily attempt Producer and Consumer.

Producer API allows apps to publish records to one or more topics.

Consumer API allows apps to subscribe to one or more topics.

Stream API allows apps to consume records from one or more topics and publishes the records to one or more topics, i.e. transforms the input streams to output stream.

In Kafka, the topic where the records are published. Topics in Kafka can have zero or multiple consumers that subscribe data. For each Topic in kafka we will have partitioned logs where records are ordered and immutable. The each record in partition have a sequential id number called offset.

As mentioned earlier, the records in Kafka topic are not lost unless it reaches the retention policy limit or space limit. Hence based on offset the consumers can read any messages anytime. Normally consumer will update its offset in linear order as it reads the record.

Install Kafka:

Here are the steps I followed to install Kafka using brew:

brew install kafka

Edit Kafka server.properties file.

/usr/local/etc/kafka/server.propertiesuncomment:
#listeners=PLAINTEXT://:9092
update as below:
listeners=PLAINTEXT://localhost:9092

Now lets start ZooKeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Start Kafka:

kafka-server-start /usr/local/etc/kafka/server.properties

Now we have local Apache Kafka up and running. We should be able to now Create a Topic and establish Producer / Subscriber.

Implement Kafka Producer:

Here we will use Java Spring Boot framework and will make use of spring-kafka dependency, as this provides everything we need to send and receive message with Kafka.

Create Topic:

Using command line sample:-

kafka-topics --create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic sample-topic

But We will create Topic programmatically using KafkaAdmin as shown below

@Configuration
public class KafkaTopicConfig {

@Value(value = "${kafka.bootstrap.address}")
private String bootstrapAddress;

@Value(value = "${kafka.topic.name}")
private String topicName;

@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
return new NewTopic(topicName, 1, (short) 1);
}
}

We are creating a topic with Number of Partitions as 1, and replication factor as 1. And the Topic name is passed via application properties.

application.properties :

kafka.bootstrap.address=localhost:9092
kafka.topic.name
=shine-test-local-topic

Producer config: here we are configuring the Kafka ProducerFactory and KafkaTemplate to send messages.

@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrap.address}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

Next we will add a message producer service.

@Service
@Slf4j
public class ProducerService {

@Value("${kafka.topic.name}")
private String topicName;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String message){

ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topicName, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent message=[{}] with offset=[{}]" , message ,result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=[{}] due to : {}",message,ex.getMessage());
}
});
}
}

Next we will add an api to send/produce messages. Here I am using a simple sample Student model, and sample controller to post the message.

Model:

@Getter @Setter
@ToString
public class Student {

private String name;
}

Controller:

@RestController
@RequestMapping("/api")
@Slf4j
public class KafkaController {

@Autowired
ProducerService producerService;

@PostMapping (value = "/send/student/info")
public String kafkaMessage(@RequestBody Student message){
producerService.sendMessage(message.toString());
return "Success";
}
}

Now let’s Start our Spring Boot App.

Posting few messages via our api:

curl -X POST “http://localhost:8080/api/send/student/info" -H “accept: */*” -H “Content-Type: application/json” -d “{ \”name\”: \”send test 3\”}”

To view the posted messages in Kafka Topic: We will invoke kafka-console-consumer.sh

/usr/local/Cellar/kafka/2.5.0/libexec/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shine-test-local-topic --from-beginning

And the producer has successfully sent our messages to the Kafka topic.

Subscribe to Kafka Topic:

Adding the consumer config.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Value(value = "${kafka.bootstrap.address}")
private String bootstrapAddress;

@Value(value = "${kafka.consumer}")
private String groupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

And adding the Listener service:

@Service
@Slf4j
public class KafkaMessageConsumer {

@KafkaListener(topics = "shine-test-local-topic", groupId = "shine-local")
public void listen(String message) {
log.info("Received Messasge in group: {}", message);
}
}

Now start the Spring Boot App:

2020–05–25 00:35:54.103 INFO 16170 — — [ntainer#0–0-C-1] c.m.s.k.listener.KafkaMessageConsumer : Received Messasge in group : Student(name=send test 1)

Messages are consumed successfully.

Working example is available in GitHub.

Hope this helps to successfully setup Kafka, and establish Producer / Consumer connection to the Kafka.

🙏

--

--

Shinemon Divakaran

Passionate Software Engineer exploring to learn and share knowledge. I enjoy hiking, running, and most sports. All opinions are my own.