Kafka programming in Java with Protobuf serialization

Ramiz Raza
3 min readFeb 4, 2023

--

Google Protocol Buffer (protobuf) is a well known data serialization format which is efficient (smaller and faster than json) and both platform neutral as well as language neutral. This post is regarding how we can use protobuf serialization format to send messages to a kafka topic and read same. I have also pasted github link at the end for full running code.

Here is my another post on kafka programming with Avro serialization : https://medium.com/@rramiz.rraza/kafka-programming-in-java-with-avro-serialization-f0121db5a5a1

My local setup :-

  1. kafka (version 3.3.2) installed on a linux (Fedora) VM
  2. Profobuf compiler installed on windows
  3. Java program on IntelliJ IDE on windows
  4. Java connects to kafka (installed on VM)

Model :-

Below is the model written in protobuf format. Upon successful compilation this will generate respective java class ProtMessage with fields id and name inside ExchangeProtoMessage.

syntax = "proto3";

package exchange_message_def;

option java_package = "com.kafka.message";
option java_outer_classname = "ExchangeProtoMessage";
option optimize_for = SPEED;

message ProtMessage {
optional int32 id = 1;
optional string name = 2;
}

Model Serializer :-

In order to send ProtMessage objects as values to kafka topic, we need corresponding serializer that can convert ProtMessage objects to byte array. This serializer is passed to kafka producer when the producer is initialized.

package com.kafka.model;

import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Serializer;

public class ProtMessageSerializer implements Serializer<ProtMessage>{
@Override
public byte[] serialize(String topic, ProtMessage data) {
return data.toByteArray();
}
}

Model Deserializer :-

In order to read ProtMessage objects as values from kafka topic, we need corresponding deserializer that can convert byte array to ProtMessage objects. This deserializer is passed to kafka consumer when the consumer is initialized.

package com.kafka.model;

import com.google.protobuf.InvalidProtocolBufferException;
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Deserializer;

public class ProtMessageDeserializer implements Deserializer<ProtMessage>{
@Override
public ProtMessage deserialize(String topic, byte[] data) {
try {
return ProtMessage.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
throw new RuntimeException("excepiton while parsing");
}
}
}

Kafka Producer :-

Here we initialize a kafka producer with key of integer type and value of ProtMessage type. Then we send few messages to the kafka topic.

package com.kafka.producer;

import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;

import java.util.Properties;

public class MyKafkaProducerWithProtobufModel {

public static void main(String[] args) {

System.out.println("going to publish messages");

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("linger.ms", 1);

Producer<Integer, ProtMessage> producer = new KafkaProducer<>(props, new IntegerSerializer(), new ProtMessageSerializer());
for (int i = 1; i <= 10; i++){
producer.send(new ProducerRecord<>("myFirstTopic", 0, i, ProtMessage.newBuilder().setId(i).setName(i + "proto value").build()));
}

producer.close();
}
}

Kafka Consumer :-

Here we initialize a kafka consumer with key of integer type and value of ProtMessage type. Then we read messages published by the producer from the kafka topic.

package com.kafka.consumer;

import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyKafkaConsumerWithProtobufModel {

public static void main(String[] args) {

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");

KafkaConsumer<Integer, ProtMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new ProtMessageDeserializer());
consumer.subscribe(Arrays.asList("myFirstTopic"));

while (true) {
ConsumerRecords<Integer, ProtMessage> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<Integer, ProtMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
}
}
}
}

Github link : https://github.com/ramizgit/kafka-programming

Checkout : https://medium.com/@rramiz.rraza

I appreciate you and the time you took out of your day to read this! Please watch out (& follow) for more blogs on big data and other latest technologies. Cheers !

--

--