Event-Driven Architecture

with Spring Boot and Kafka

Bubu Tripathy
12 min readJul 27, 2023

What is Event-Driven Architecture

Event-Driven Architecture (EDA) is a design pattern that emphasizes the use of events to trigger and communicate changes between different components of a system. In an EDA, services interact through the exchange of events, which are messages representing an occurrence or a state change.

Advantages of Event-Driven Architecture

  • Loose coupling: Services are decoupled from each other, promoting independence and modularity.
  • Scalability: Services can scale independently based on event demand, improving overall system performance.
  • Asynchronous processing: Events enable asynchronous communication, reducing latency and improving responsiveness.
  • Event sourcing: EDA naturally supports event sourcing, where the state of a system is derived from the sequence of events that led to it.
  • Flexibility: New services can be added or existing ones modified without affecting the entire system.

Kafka and its role in EDA

Apache Kafka is a distributed streaming platform that serves as a backbone for building real-time data pipelines and streaming applications. It is designed to handle high-throughput, fault-tolerant, and scalable event streaming.

Kafka is built around the publish-subscribe messaging model, where producers publish messages (events) to topics, and consumers subscribe to those topics to receive and process the messages. The events are stored in Kafka in an immutable and append-only manner, allowing both real-time and historical data processing.

Key Kafka concepts

  • Topics: Channels for publishing and subscribing to events.
  • Producers: Services that produce and send events to Kafka topics.
  • Consumers: Services that subscribe to topics and process incoming events.
  • Partitions: Each topic is divided into partitions, allowing for parallel processing and load distribution.

Role of Kafka in EDA

Apache Kafka plays a pivotal role in Event-Driven Architecture (EDA) by acting as a scalable and durable event bus for communication between microservices. In EDA, microservices are designed to produce and consume events, allowing them to communicate asynchronously without direct dependencies on each other.

Key Roles

  1. Event Broker: Kafka acts as a central event broker, serving as a highly scalable and reliable middleware to handle event streams. It efficiently manages the routing, storage, and distribution of events between various services.
  2. Event Log: Kafka’s append-only log-like storage allows events to be durably stored and retained for a configurable period. This makes it an ideal source of truth for event sourcing, auditing, and data replay scenarios.
  3. Decoupling Services: Kafka decouples services by enabling producers to publish events without needing to know which specific services will consume them. Similarly, consumers do not need to be aware of the producers generating the events.
  4. Reliability and Fault Tolerance: Kafka provides strong guarantees of data durability and fault tolerance. Events are replicated across multiple brokers, ensuring high availability even in the face of node failures.
  5. Scalability: Kafka’s distributed architecture allows it to scale horizontally by adding more brokers to the cluster. This enables it to handle high-throughput event streams from numerous producers and consumers.
  6. Real-Time Data Processing: Kafka enables real-time data processing by delivering events with low latency. This makes it suitable for use cases where timely and immediate processing of events is crucial.
  7. Event Time Ordering: Kafka preserves the order of events within each partition, ensuring event time ordering. This is critical for maintaining consistency when processing events across multiple services.
  8. Schema Evolution and Compatibility: Kafka supports schema serialization using frameworks like Avro or Protobuf, allowing for schema evolution and ensuring backward and forward compatibility when services evolve over time.

Setting up the Environment

Spring Boot is a popular Java framework for building microservices. It simplifies the development process by providing various utilities and conventions, allowing developers to focus on business logic rather than boilerplate code. Spring Boot’s extensive ecosystem also makes it easy to integrate with other technologies.

Let’s begin by creating a simple Spring Boot project with Kafka integration. We’ll set up the necessary dependencies for Kafka in our project.

Create a new Spring Boot project using your preferred IDE.

Add the following Maven or Gradle dependencies to the pom.xml or build.gradle file:

<!-- For Maven -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
// For Gradle
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'

Before using Kafka for event-driven communication, we need to create topics and specify the number of partitions. Topics are like channels that allow producers to publish events and consumers to subscribe to events.

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 1, (short) 1);
}
}

In this example, we use a @Configuration class to define the Kafka topic "example_topic" with one partition and one replication factor. Replication factor ensures fault tolerance by replicating data across multiple brokers.

Now, let’s create a simple project to demonstrate the integration of Spring Boot with Kafka. We’ll create a basic Kafka producer and consumer.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic = "example_topic";

@Autowired
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(String message) {
kafkaTemplate.send(topic, message);
}
}

In this example, we’ve created a KafkaProducerService class, responsible for sending messages (events) to the Kafka topic named "example_topic." We use the KafkaTemplate provided by Spring Kafka to publish messages.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}

The KafkaConsumerService class listens to the "example_topic" and belongs to the consumer group "example_group." Whenever a message is published to the topic, the receiveMessage method will be invoked to process the incoming event.

Real-world Examples

Let’s see few real-world examples.

Order Processing

In a real-world order processing system, events can flow as follows:

  • Order placed.
  • Payment processed.
  • Inventory updated.

In this scenario, different microservices handle each event, and they communicate through Kafka to maintain consistency and decouple services.

Step 1: Set Up the Kafka Environment

Start the ZooKeeper server (required for Kafka)

bin/zookeeper-server-start.sh config/zookeeper.properties

Start the Kafka server

bin/kafka-server-start.sh config/server.properties

Create two Kafka topics for order events and payment events:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic order_events

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic payment_events

Create a new Spring Boot project using your preferred IDE. Add the following dependencies to the pom.xml file:

<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Starter for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

Create two classes to represent the event data: OrderEvent and PaymentEvent. These classes will be used for communication between services.

public class OrderEvent {
private Long orderId;
// Other fields, constructors, getters, setters
}

public class PaymentEvent {
private Long orderId;
private BigDecimal amount;
// Other fields, constructors, getters, setters
}

Create two Kafka producer services, one for order events and one for payment events.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderEventProducer {

private static final String TOPIC = "order_events";

@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;

public void sendOrderEvent(OrderEvent orderEvent) {
kafkaTemplate.send(TOPIC, orderEvent);
}
}
@Service
public class PaymentEventProducer {

private static final String TOPIC = "payment_events";

@Autowired
private KafkaTemplate<String, PaymentEvent> kafkaTemplate;

public void sendPaymentEvent(PaymentEvent paymentEvent) {
kafkaTemplate.send(TOPIC, paymentEvent);
}
}

Create two Kafka consumer services, one for order events and one for payment events.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class OrderEventConsumer {

@KafkaListener(topics = "order_events", groupId = "order_group")
public void handleOrderEvent(OrderEvent orderEvent) {
// Process the order event, e.g., store it in the database
System.out.println("Received Order Event: " + orderEvent);
// Implement the logic for order processing and updating inventory
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class PaymentEventConsumer {

@KafkaListener(topics = "payment_events", groupId = "payment_group")
public void handlePaymentEvent(PaymentEvent paymentEvent) {
// Process the payment event, e.g., update payment status
System.out.println("Received Payment Event: " + paymentEvent);
// Implement the logic for payment processing and order status update
}
}

Create REST endpoints to receive order and payment data from clients. Upon receiving orders and payment data, send events to Kafka.

@RestController
public class OrderController {

@Autowired
private OrderEventProducer orderEventProducer;

@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody OrderEvent orderEvent) {
// Process the received order event and send it to Kafka
orderEventProducer.sendOrderEvent(orderEvent);
return ResponseEntity.ok("Order created successfully");
}
}
@RestController
public class PaymentController {

@Autowired
private PaymentEventProducer paymentEventProducer;

@PostMapping("/payments")
public ResponseEntity<String> processPayment(@RequestBody PaymentEvent paymentEvent) {
// Process the received payment event and send it to Kafka
paymentEventProducer.sendPaymentEvent(paymentEvent);
return ResponseEntity.ok("Payment processed successfully");
}
}

Run the Spring Boot application and use a REST client (e.g., Postman) to create orders and process payments. Observe the logs to see the events being received by the consumer services and the order processing and inventory updating logic being executed.

Keep in mind that this is a simplified example, and in a real-world application, you would add more validations, error handling, and additional services for order processing, payment gateways, and inventory management.

IoT and sensor data processing

Let’s create a sample application for IoT and sensor data processing using Spring Boot and Kafka. In this example, we’ll focus on collecting sensor data, aggregating it, and generating alerts based on certain conditions. Note that this is a simplified example, and in a real-world scenario, additional complexity and data processing would be involved.

Create a class to represent the sensor data event: SensorDataEvent. This class will be used for communication between services.

public class SensorDataEvent {
private String sensorId;
private double value;
// Other fields, constructors, getters, setters
}

Create a Kafka producer service to send sensor data events.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class SensorDataEventProducer {

private static final String TOPIC = "sensor_data_events";

@Autowired
private KafkaTemplate<String, SensorDataEvent> kafkaTemplate;

public void sendSensorDataEvent(SensorDataEvent sensorDataEvent) {
kafkaTemplate.send(TOPIC, sensorDataEvent);
}
}

Create a Kafka consumer service to receive sensor data events and an aggregator to aggregate the data.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SensorDataEventConsumer {

@Autowired
private DataAggregator dataAggregator;

@KafkaListener(topics = "sensor_data_events", groupId = "sensor_data_group")
public void handleSensorDataEvent(SensorDataEvent sensorDataEvent) {
// Process the received sensor data event and pass it to the aggregator
dataAggregator.aggregate(sensorDataEvent);
}
}
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class DataAggregator {

private final Map<String, Double> sensorDataMap = new HashMap<>();

public void aggregate(SensorDataEvent sensorDataEvent) {
// Aggregating sensor data by sensor ID
double currentValue = sensorDataMap.getOrDefault(sensorDataEvent.getSensorId(), 0.0);
double aggregatedValue = currentValue + sensorDataEvent.getValue();
sensorDataMap.put(sensorDataEvent.getSensorId(), aggregatedValue);

// Check if the aggregated value exceeds a certain threshold and generate an alert
if (aggregatedValue > 1000) {
generateAlert(sensorDataEvent.getSensorId(), aggregatedValue);
}
}

private void generateAlert(String sensorId, double aggregatedValue) {
System.out.println("ALERT! Sensor ID: " + sensorId + ", Aggregated Value: " + aggregatedValue);
// Implement the logic for generating alerts, e.g., sending notifications or triggering actions
}
}

Create a REST endpoint to receive sensor data from IoT devices. Upon receiving sensor data, send events to Kafka for further processing and aggregation.

@RestController
public class SensorDataController {

@Autowired
private SensorDataEventProducer sensorDataEventProducer;

@PostMapping("/sensors/{sensorId}/data")
public ResponseEntity<String> sendSensorData(@PathVariable String sensorId, @RequestBody double value) {
// Create a SensorDataEvent and send it to Kafka
SensorDataEvent sensorDataEvent = new SensorDataEvent(sensorId, value);
sensorDataEventProducer.sendSensorDataEvent(sensorDataEvent);
return ResponseEntity.ok("Sensor data received successfully");
}
}

Run the Spring Boot application and use a REST client (e.g., Postman) to test the application

Error handling and Retries

Error handling and retries are essential components of a robust and reliable software system. They play a crucial role in ensuring that applications can recover gracefully from unexpected failures, transient issues, or external service outages. Error handling is the process of dealing with unexpected or exceptional situations that can arise during the execution of a program.

Retries are a mechanism used to automatically attempt the execution of a failed operation or request multiple times. The purpose of retries is to handle transient errors, which are temporary and can be resolved by retrying the operation after a short delay. Retries can be particularly useful when dealing with unreliable network connections or external services that experience occasional failures.

There are various strategies for implementing retries.

Fixed Retry: Retry the operation a fixed number of times with a constant delay between retries.

Exponential Backoff: Increase the delay between each retry exponentially to avoid overwhelming the system if the issue persists.

Randomized Backoff: Introduce randomization in the delay between retries to spread out retry attempts and avoid request collisions.

Circuit Breaker: After a certain number of consecutive failures, stop retrying and “trip” the circuit to prevent further retries. After a cooldown period, attempts can be resumed.

Retry with Jitter: Combine fixed or exponential backoff with jitter (random delay) to reduce the likelihood of multiple clients retrying simultaneously.

We can use Spring-Retry to implement retries for event processing. First, add the Spring-Retry dependency to the pom.xml or build.gradle file:

<!-- For Maven -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
// For Gradle
implementation 'org.springframework.retry:spring-retry'

Next, update the consumer to retry processing in case of failure:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

@Retryable(
value = { Exception.class },
maxAttempts = 3,
backoff = @Backoff(delay = 1000, maxDelay = 3000)
)
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
try {
// Process the message here
System.out.println("Received message: " + message);
} catch (Exception e) {
throw new RuntimeException("Error processing message: " + message, e);
}
}
}

In this example, we added the @Retryable annotation to the receiveMessage method, specifying the maximum number of retry attempts (3) and the delay between retries (1 second with a maximum of 3 seconds).

Dead-letter queue

Dead-letter queues are queues where events that fail processing are sent for further analysis and debugging. This way, problematic events can be reviewed without affecting the main event processing flow.

To move failed events to a dead-letter queue, we can configure Kafka to have a separate topic for such events.

@Configuration
public class KafkaTopicConfiguration {

@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 1, (short) 1);
}

@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("example_topic_dead_letter", 1, (short) 1);
}
}

Next, update the consumer to send failed events to the dead-letter queue:

@Service
public class KafkaConsumerService {

@Retryable(
value = { Exception.class },
maxAttempts = 3,
backoff = @Backoff(delay = 1000, maxDelay = 3000)
)
@KafkaListener(topics = "example_topic", groupId = "example_group")
public void receiveMessage(String message) {
try {
// Process the message here
System.out.println("Received message: " + message);
} catch (Exception e) {
// Move failed event to the dead-letter queue
kafkaProducerService.sendMessageToDeadLetterTopic(message);
}
}
}

In this example, we added a method sendMessageToDeadLetterTopic in the KafkaProducerService to send failed messages to the "example_topic_dead_letter" topic.

With these enhancements, our event processing pipeline becomes more resilient and capable of handling transient failures gracefully.

Scaling Event-Driven Microservices

Scaling Kafka consumers and producers can be achieved by increasing the number of instances of the respective services. By running multiple instances of consumers in parallel, we can process events concurrently, enhancing overall throughput.

When scaling consumers, it’s crucial to consider partitioning. Kafka partitions are the unit of parallelism, and each partition is consumed by only one consumer in a consumer group. Therefore, we should have enough partitions to allow for efficient parallel processing.

Consumers with the same group ID form a consumer group and distribute the load of consuming events from the same topic. To increase parallelism, we can add more consumers to a consumer group.

Example

To scale consumers, we need to set a unique instanceId for each consumer instance and add multiple instances of the same consumer to the consumer group.

@Service
public class KafkaConsumerService {

@Value("${kafka.instanceId}")
private String instanceId;

@KafkaListener(
topicPartitions = @TopicPartition(
topic = "example_topic",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "1", initialOffset = "0"),
// Add more partitions and offsets as needed
}
),
groupId = "example_group"
)
public void receiveMessage(String message) {
System.out.println("[" + instanceId + "] Received message: " + message);
}
}

In this example, we used the @KafkaListener annotation with topicPartitions to specify the partitions and their initial offsets for this consumer instance. Each consumer instance will process events from different partitions, increasing parallelism.

groupId: This attribute specifies the consumer group to which this consumer belongs. Consumers in the same group share the load of consuming messages from the topic partitions.

To create multiple instances of the consumer service, we need to run the application with different instanceId values. We can use Spring profiles to achieve this.

@SpringBootApplication
public class KafkaApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance1");
SpringApplication.run(KafkaApplication.class, "--spring.profiles.active=instance2");
// Add more instances as needed
}
}

In this example, we started two instances of the application with profiles “instance1” and “instance2.” Each instance will have a unique instanceId, and they will consume events from different partitions, providing better parallelism.

To increase the number of partitions in the “example_topic,” we need to modify the KafkaTopicConfiguration.

@Configuration
public class KafkaTopicConfiguration {

@Bean
public NewTopic exampleTopic() {
return new NewTopic("example_topic", 4, (short) 1);
}

@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("example_topic_dead_letter", 1, (short) 1);
}
}

In this example, we increased the number of partitions to 4 for the “example_topic.” This will allow for better load distribution and improved parallel processing.

By running multiple consumer instances and increasing the number of partitions, we effectively scale our event-driven microservices to handle higher throughput.

Conclusion

Event-Driven Microservices Architecture with Spring Boot and Kafka offers a powerful and scalable approach to building modern distributed systems. By leveraging the benefits of event-driven communication, loose coupling, and asynchronous processing, developers can create flexible, resilient, and responsive applications that meet the demands of today’s dynamic business landscape.

--

--