Kafka programming in Java with Avro serialization

Ramiz Raza
3 min readFeb 10, 2023

--

Apache Avro is a well known data serialization format which is efficient (smaller and faster than json) and language neutral. It uses JSON to define schema and serializes data in compact binary format. This post is regarding how we can use avro serialization format to send messages to a kafka topic and read same. I have also pasted github link at the end for full running code.

Here is my another post on kafka programming with Protobuf serialization : https://medium.com/@rramiz.rraza/kafka-programming-in-java-with-protobuf-serialization-a9c42590442d

My local setup :-

  1. kafka (version 3.3.2) installed on a linux (Fedora) VM
  2. Java program on IntelliJ IDE on windows
  3. Java connects to kafka (installed on Fedora VM)

Model :-

Below is the model written in avro format (.avsc file). Upon successful compilation this will generate respective java class AvroMessage with fields id and name inside it.

AvroModel.avsc

{
"namespace": "com.kafka.message",
"type": "record",
"name": "AvroMessage",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
}

Model Serializer :-

In order to send AvroMessage objects as values to kafka topic, we need corresponding serializer that can convert AvroMessage objects to byte array. This serializer is passed to kafka producer when the producer is initialized.

package com.kafka.model;

import com.kafka.message.AvroMessage;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class AvroMessageSerializer implements Serializer<AvroMessage> {
@Override
public byte[] serialize(String topic, AvroMessage data) {

byte[] arr = new byte[100000];
try {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);
GenericDatumWriter<AvroMessage> writer = new GenericDatumWriter<>(data.getSchema());
writer.write(data, binaryEncoder);
binaryEncoder.flush();
arr = outputStream.toByteArray();
}
} catch (IOException e) {
e.printStackTrace();
}

return arr;
}
}

Model Deserializer :-

In order to read AvroMessage objects as values from kafka topic, we need corresponding deserializer that can convert byte array to AvroMessage objects. This deserializer is passed to kafka consumer when the consumer is initialized.

package com.kafka.model;

import com.kafka.message.AvroMessage;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.common.serialization.Deserializer;

public class AvroMessageDeserializer implements Deserializer<AvroMessage> {
@Override
public AvroMessage deserialize(String topic, byte[] data) {

try {
if (data != null) {
DatumReader<AvroMessage> reader = new SpecificDatumReader<>(AvroMessage.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
return reader.read(null, decoder);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

Kafka Producer :-

Here we initialize a kafka producer with key of integer type and value of AvroMessage type. Then we send few messages to the kafka topic.

package com.kafka.producer;

import com.kafka.message.AvroMessage;
import com.kafka.model.AvroMessageSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;

import java.util.Properties;

public class MyKafkaProducerWithAvroSerializer {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

Producer<Integer, AvroMessage> producer = new KafkaProducer<>(props, new IntegerSerializer(), new AvroMessageSerializer());
for (int i = 1; i <= 100; i++){
producer.send(new ProducerRecord<>("myFirstTopic", 0, i, AvroMessage.newBuilder().setId(i).setName(i + "avro value").build()));
}

producer.close();
}
}

Kafka Consumer :-

Here we initialize a kafka consumer with key of integer type and value of AvroMessage type. Then we read messages published by the producer from the kafka topic.

package com.kafka.consumer;

import com.kafka.message.AvroMessage;
import com.kafka.model.AvroMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyKafkaConsumerWithAvroSerializer {

public static void main(String[] args) {

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");

KafkaConsumer<Integer, AvroMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new AvroMessageDeserializer());
consumer.subscribe(Arrays.asList("myFirstTopic"));

while (true) {
ConsumerRecords<Integer, AvroMessage> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<Integer, AvroMessage> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
}
}
}
}

Github link : https://github.com/ramizgit/kafka-programming

Checkout : https://medium.com/@rramiz.rraza

I appreciate you and the time you took out of your day to read this! Please watch out (& follow) for more blogs on big data and other latest technologies. Cheers !

--

--