Strategy Pattern in Kafka CodeBase

Manish Kumar
4 min readJun 5, 2023

--

What is Strategy Pattern?

The Strategy pattern is a behavioral design pattern that enables selecting an algorithm or strategy at runtime. It allows encapsulating a family of algorithms and making them interchangeable within a context.

Main Components of Strategy Pattern

In the context of the Strategy pattern, we have three main components:

  1. Context: It represents the context or the object that requires a specific behavior. It maintains a reference to a strategy object and delegates the execution of the behavior to that strategy.
  2. Strategy: It defines the interface or abstract class for the strategy algorithm. Different concrete strategies implement this interface, providing alternative implementations of the algorithm.
  3. Concrete Strategies: These are the specific implementations of the strategy algorithm. Each concrete strategy encapsulates a different variant of the algorithm.

The Strategy pattern promotes loose coupling between the context and the strategies.

It allows the context to select and switch between different strategies dynamically, without directly depending on their concrete implementations.

This flexibility enables the application to adapt and change behaviors at runtime, promoting code extensibility and maintainability.

Kafka Producer Code

What is Kafka Producer ?

In Apache Kafka, a Kafka Producer is a client application or component responsible for producing (writing) data to Kafka topics. It allows applications to send data in the form of records to Kafka brokers, which are the distributed message brokers in a Kafka cluster.

A Kafka Producer publishes records to Kafka topics, where each record consists of a key, a value, and an optional timestamp. The key and value can be of any arbitrary type, such as strings, JSON objects, or byte arrays, and the timestamp represents the time when the record was produced.

Responsibilities of Kafka Producer?

The Kafka Producer is responsible for producing data to Kafka topics, including tasks such as record partitioning, serialization, compression, acknowledgment handling, error management, and performance optimization. It ensures reliable and efficient delivery of data while supporting customization and extensibility through hooks and callbacks.

We will pick Record paritioning responsibility as an example in our case:

What is Record Partitioning:

The Producer determines the target partition to which each record should be written. It may use a partitioning strategy, such as round-robin, key-based, or custom partitioning, to ensure that records are evenly distributed across partitions or grouped based on specific criteria.

For more details you can refer to my previous article: Kafka Partition Strategies

How kafka Producer implements Strategy pattern?

In below code, the partitioning strategy, represented by the Partitioner interface and its implementations, follows the Strategy pattern.

The context is the CusomeProducer which wants the desired behavior to be achieved, the Strategy is the Partitioner, and the Concrete Strategies are the different implementations of the Partitioner interface, such as MyPartitioneror AnotherPartitioner.

The producer can dynamically select and switch between these partitioning strategies based on the user’s configuration or requirements.

We have defined the Partitioner interface, which represents the partitioning strategy. The MyPartitioner and AnotherPartitioner classes implement this interface and provide their own custom partitioning logic.

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public interface Partitioner {
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
void configure(Map<String, ?> configs);
}

public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
// Custom logic to determine the target partition based on the key or value
// ...

// Return the selected partition
return selectedPartition;
}

@Override
public void close() {
// Clean up resources
}

@Override
public void configure(Map<String, ?> configs) {
// Configuration initialization
}
}

public class AnotherPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
// Custom logic to determine the target partition based on the key or value
// ...

// Return the selected partition
return selectedPartition;
}

@Override
public void close() {
// Clean up resources
}

@Override
public void configure(Map<String, ?> configs) {
// Configuration initialization
}
}

public class CustomProducer<K, V> {
private Producer<K, V> producer;
private Partitioner partitioner;

public CustomProducer(Properties props) {
producer = new KafkaProducer<>(props);
}

public void setPartitioner(Partitioner partitioner) {
this.partitioner = partitioner;
}

public void send(String topic, K key, V value) {
byte[] keyBytes = serializeKey(key);
byte[] valueBytes = serializeValue(value);

// Delegate the partitioning behavior to the partitioner
int partition = partitioner.partition(topic, key, keyBytes, value, valueBytes, producer.cluster());
producer.send(new ProducerRecord<>(topic, partition, key, value));
}

// Other methods and helper functions...
}

Driver Code:

// Usage example
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

CustomProducer<String, String> producer = new CustomProducer<>(props);

// Setting the MyPartitioner
producer.setPartitioner(new MyPartitioner());

// Sending records using the custom partitioner
producer.send("my-topic", "key", "value");

// Setting the AnotherPartitioner
producer.setPartitioner(new AnotherPartitioner());

// Sending records using the another custom partitioner
producer.send("my-topic", "key", "value");

By separating the partitioning logic into separate classes and defining a common interface, we achieve the core principles of the strategy pattern, which is to encapsulate interchangeable algorithms and promote flexibility in selecting the desired strategy at runtime.

Happy Learning!

References:

https://github.com/apache/kafka

--

--