Intro to Apache Kafka with Spring Boot

Srikanth Dannarapu
Javarevisited
Published in
8 min readFeb 28, 2023

--

Apache Kafka is a distributed event streaming platform that is designed to handle large volumes of data in real-time. The Kafka architecture consists of four main components: producers, brokers, consumers, and ZooKeeper.

  1. Producers: Producers are clients that publish messages to Kafka topics. They can be any type of application or system that generates data, such as sensors, web servers, or databases. When a producer publishes a message to a topic, it sends the message to a Kafka broker.
  2. Brokers: Brokers are the servers that form the Kafka cluster. They are responsible for receiving messages from producers, storing the messages in a distributed commit log, and serving the messages to consumers. Each broker in the Kafka cluster is identified by a unique integer called a broker ID.
  3. Consumers: Consumers are clients that read messages from Kafka topics. They can be any type of application or system that processes data, such as analytics engines, search engines, or machine learning algorithms. When a consumer subscribes to a topic, it receives messages from one or more partitions of the topic.
  4. ZooKeeper: ZooKeeper is a distributed coordination service that is used by Kafka to manage the cluster state, handle leader election, and perform other administrative tasks. It maintains a list of all brokers in the Kafka cluster, along with their current state and metadata.

The Kafka messaging model is based on topics and partitions. A topic is a logical category or feed name to which messages are published. Each topic is divided into one or more partitions, which are ordered, immutable sequences of messages that can be stored on different brokers in the Kafka cluster.

When a producer publishes a message to a topic, it specifies the topic name and a key that is used to determine the partition to which the message is written. The partition is chosen based on the hash of the key, so messages with the same key are always written to the same partition. Each partition is replicated across multiple brokers to ensure high availability and durability.

Consumers can subscribe to one or more topics and read messages from one or more partitions. When a consumer reads messages from a partition, it maintains its own offset, which is the position of the last consumed message in the partition. This allows consumers to read messages at their own pace and resume reading from where they left off in case of failures.

The Kafka architecture provides a scalable, fault-tolerant, and high-performance platform for building real-time data pipelines and streaming applications. Its distributed nature allows it to handle large volumes of data and provide low-latency access to data across multiple applications and systems.

Partitions, Replication factor, and Retention period:

Partitions, replication factor, and retention period are important concepts in Apache Kafka that determine how data is stored, replicated, and retained in a Kafka cluster.

  • Partitions: A Kafka topic is divided into one or more partitions, which are essentially ordered and immutable sequences of records. Each partition can be stored on a different Kafka broker, allowing Kafka to distribute the load of reading and writing data across multiple nodes. This allows Kafka to scale horizontally and handle large amounts of data.
  • Replication factor: To ensure high availability and fault tolerance, Kafka replicates each partition across multiple brokers. The number of replicas is determined by the replication factor, which specifies the number of copies of each partition that should be maintained. For example, if the replication factor is 3, then each partition will be replicated three times across three different brokers. This means that even if one or two brokers fail, the data will still be available.
  • Retention period: Kafka topics can be configured with a retention period, which specifies how long Kafka should retain messages in the topic before deleting them. This can be useful for keeping track of recent data or for implementing data retention policies. Kafka supports two types of retention period: time-based and size-based. Time-based retention policies delete messages that have been in the topic for a certain amount of time, while size-based policies delete messages based on the total size of data stored in the topic.

By configuring partitions, replication factor, and retention period, Kafka users can design their Kafka topics to meet specific requirements for scalability, fault tolerance, and data retention.

Get started with Apache Kafka

Here are some steps to help you get started with Apache Kafka:

  1. Download and install Apache Kafka: You can download the latest version of Apache Kafka from the official website. Once you have downloaded it, you can follow the instructions provided to install it on your local machine.
  2. Start the Kafka server: After you have installed Kafka, you can start the Kafka server by running the following command:
bin/kafka-server-start.sh config/server.properties

This command will start the Kafka server on your local machine.

3. Create a topic: A topic is a category or feed name to which messages are published. You can create a topic by running the following command:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

This command will create a topic called my-topic on the Kafka server running on localhost:9092.

4. Send messages to the topic: You can send messages to the topic using the Kafka producer API. Here’s an example:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;

public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", message);

producer.send(record);
producer.close();
}
}

In this example, we create a Kafka producer that sends a message “Hello, Kafka!” to the topic “my-topic”.

5. Consume messages from the topic: You can consume messages from the topic using the Kafka consumer API. Here’s an example:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", "my-group");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
records.forEach(record -> {
System.out.println("Received message: " + record.value());
});
}
}
}

In this example, we create a Kafka consumer that listens for messages on the topic “my-topic” and prints the received messages to the console.

Example :

Here’s an example Kafka Spring Boot endpoint that emits an event onto a Kafka topic when a new employee record is added into the database

  1. First, you need to add the Kafka and Spring Data JPA dependencies to your Spring Boot project. You can add them to your pom.xml file if you're using Maven, or to your build.gradle file if you're using Gradle.
<!-- Kafka dependency -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.4</version>
</dependency>

<!-- Spring Data JPA dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2. Create a Kafka configuration class that sets up the Kafka producer.

@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

3. Create a Kafka producer service that sends messages to the Kafka topic.

@Service
public class KafkaProducerService {

private final KafkaTemplate<String, String> kafkaTemplate;

@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String message) {
kafkaTemplate.send("employee-topic", message);
}
}

4. Create a Spring Data JPA entity for the employee record.

@Entity
public class Employee {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

private String name;

private String department;

// getters and setters
}

5. Create a Spring Data JPA repository for the employee entity.

@Repository
public interface EmployeeRepository extends JpaRepository<Employee, Long> {
}

6. Create a Spring MVC controller that handles the HTTP POST request for adding a new employee record to the database.

@RestController
public class EmployeeController {

private final EmployeeRepository employeeRepository;
private final KafkaProducerService kafkaProducerService;

@Autowired
public EmployeeController(EmployeeRepository employeeRepository, KafkaProducerService kafkaProducerService) {
this.employeeRepository = employeeRepository;
this.kafkaProducerService = kafkaProducerService;
}

@PostMapping("/employees")
public ResponseEntity<Employee> createEmployee(@RequestBody Employee employee) {
Employee savedEmployee = employeeRepository.save(employee);
kafkaProducerService.sendMessage("New employee added: " + savedEmployee.getName());
return ResponseEntity.ok(savedEmployee);
}
}

In the above example, when a new employee record is added to the database through the HTTP POST request, the createEmployee method sends a message to the Kafka topic using the kafkaProducerService. The message contains the name of the newly added employee. You can customize the message format and topic name based on your requirements.

To send newly added employee record on to the employee-topic modify KafkaTemplate

@Service
public class KafkaProducerService {

private static final String TOPIC_NAME = "employee-topic";
private static final int NUM_PARTITIONS = 3;
private static final short REPLICATION_FACTOR = 1;
private static final long RETENTION_PERIOD_MS = 86400000L; // 1 day

private final KafkaTemplate<Long, Employee> kafkaTemplate;

@Autowired
public KafkaProducerService(KafkaTemplate<Long, Employee> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendEmployee(Employee employee) {
kafkaTemplate.executeInTransaction(operations -> {
operations.send(TOPIC_NAME, employee.getId(), employee);
return null;
});

AdminClient adminClient = AdminClient.create(kafkaTemplate.getProducerFactory().getConfigurationProperties());

NewTopic newTopic = new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR);
Map<String, String> config = new HashMap<>();
config.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(RETENTION_PERIOD_MS));
newTopic.configs(config);

try {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
adminClient.close();
}
}
}

In this modified version of the KafkaProducerService, we're using the executeInTransaction method to send the Employee record to the "employee-topic" Kafka topic. This method ensures that the Kafka producer sends the message atomically, meaning that the message is either sent successfully or not at all.

We’re also using the AdminClient class from the Kafka client library to configure the Kafka topic properties. In this example, we're setting the number of partitions to 3, the replication factor to 1, and the retention period to 1 day. You can customize these properties based on your requirements. Note that the AdminClient class should be closed after use to release the resources used by the Kafka client.

Kafka consumer:

An example of a Kafka consumer in Spring Boot that reads messages from the “employee-topic” topic:

@Service
public class KafkaConsumerService {

private static final String TOPIC_NAME = "employee-topic";

@KafkaListener(topics = TOPIC_NAME, groupId = "employee-consumer-group")
public void receiveEmployee(Employee employee) {
// do something with the received employee record
System.out.println("Received employee: " + employee);
}
}

In this example, we’re using the @KafkaListener annotation from the Spring Kafka library to configure a method that will be called each time a new message is received from the "employee-topic" topic. The groupId attribute specifies the unique identifier of the consumer group to which this consumer belongs. Note that multiple consumers can belong to the same group, and Kafka will automatically distribute the partitions of the topic to the members of the group.

When a new message is received, the receiveEmployee method will be called with the Employee object that was deserialized from the Kafka message. You can modify this method to perform any necessary business logic with the received data.

To enable the Kafka consumer in your Spring Boot application, you will need to configure the Kafka properties in your application.properties or application.yml file. Here's an example configuration:

spring.kafka.consumer.bootstrap-servers=<kafka-broker-url>
spring.kafka.consumer.group-id=employee-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest

In this configuration, we’re setting the bootstrap servers property to the URL of the Kafka broker(s) that the consumer should connect to, as well as the group ID property to the same value as we used in the @KafkaListener annotation. The auto-offset-reset property specifies the behavior of the consumer when it starts up and doesn't have a valid offset for a partition. In this example, we're setting it to "earliest" to ensure that the consumer reads all messages in the topic from the beginning.

Thanks, before you go:

  • 👏 Please clap for the story and follow the author 👉
  • Please share your questions or insights in the comments section below. Let’s help each other and become better Java developers.
  • Let’s connect on LinkedIn

--

--