Implementing custom SerDes for Java objects using JSON Serializer and Deserializer in Kafka Streams applications

Arturo González V.
4 min readAug 9, 2020

--

This article will show you how to implement custom SerDes that provide serialization and deserialization in JSON format for the data types of record keys and record values.

Sunset in Cartagena

Scenario

An online company sells books, and every time a book is sold, an event is sent to Kafka. The key of the message is a String representing the ID of the order. The value of the message is a JSON with the genre of the book and the value of the sale.

1 {"genre":"Fantasy","value":50}
2 {"genre":"Fantasy","value":45}
3 {"genre":"Thriller","value":20}

We need to build a Kafka Streams application that produces the latest sales count per genre.

Creating a Java object for the messages

First, we need to create a Java object for the message in the source topic:

public class BookSold {

private String genre;
private long value;
...
}

and another one for the message we want to produce:

public class GenreCount {

private String genre;
private long count;
...
}

Writing a serializer and deserializer

To implement custom SerDes, first, we need to write a JSON serializer and deserializer by implementing org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer. This will allow us to send Java objects to Kafka as JSON, receive JSON from Kafka and return Java objects.

In this case, we’ll be using the Gson library to serialize/deserialize JSON (you can use another library like the Jackson ObjectMapper class from the Jackson Library).

JsonSerializer:

...import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.StandardCharsets;
import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {
private final Gson gson = new GsonBuilder().create();

// default constructor needed by Kafka
public JsonSerializer() {
}

@Override
public void configure(Map<String, ?> props, boolean isKey) {
// nothing to do
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;

try {
return gson.toJson(data).getBytes(StandardCharsets.UTF_8);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {
// nothing to do
}

}

JsonDeserializer:

...import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class JsonDeserializer<T> implements Deserializer<T> {

private final Gson gson = new GsonBuilder().create();

private Class<T> destinationClass;
private Type reflectionTypeToken;

public JsonDeserializer(Class<T> destinationClass) {
this.destinationClass = destinationClass;
}

public JsonDeserializer(Type reflectionTypeToken) {
this.reflectionTypeToken = reflectionTypeToken;
}

@Override
public void configure(Map<String, ?> props, boolean isKey) {
// nothing to do
}

@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;

try {
Type type = destinationClass != null ? destinationClass : reflectionTypeToken;
return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), type);
} catch (Exception e) {
throw new SerializationException("Error deserializing message", e);
}
}

@Override
public void close() {
// nothing to do
}
}

Writing a custom SerDes

Now, we need to write a SerDes for our BookSold and GenreCount Java objects by using the Serdes.serdeFrom method which will allow us to construct a serde object from a separate serializer and deserializer

I will use a CustomSerdes factory for creating serializers / deserializers. I will also use a static method to return a new instance for each SerDes.

...import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
public final class CustomSerdes { private CustomSerdes() {} public static Serde<BookSold> BookSold() {
JsonSerializer<BookSold> serializer = new JsonSerializer<>();
JsonDeserializer<BookSold> deserializer = new JsonDeserializer<>(BookSold.class);
return Serdes.serdeFrom(serializer, deserializer);
}

public static Serde<GenreCount> GenreCount() {
JsonSerializer<GenreCount> serializer = new JsonSerializer<>();
JsonDeserializer<GenreCount> deserializer = new JsonDeserializer<>(GenreCount.class);
return Serdes.serdeFrom(serializer, deserializer);
}
}

Using the custom SerDes

Finally, we can use our custom SerDes for consuming the BookSold event from the Kafka topic, transforming it using the Kafka Streams API, and send the new event back to Kafka:

...// need to use BookSold type value serde
final KStream<String, BookSold> source = builder
.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), CustomSerdes.BookSold()));

final KTable<String, GenreCount> counts = source
.groupBy((key, value) -> value.getGenre())
.count()
.mapValues(GenreCount::new);

// need to override value serde to GenreCount type
counts
.toStream()
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), CustomSerdes.GenreCount()));
...

Final remarks

As you can see, using custom SerDes will allow us to easily receive JSON from Kafka and return Java objects, apply some business logic, and send Java objects back to Kafka as JSON in Kafka Streams applications.

It’s important to note that when using a framework like Spring Cloud Stream with the Kafka Streams binder, you don’t have to write a custom SerDes but call the built-in JsonSerde<T> class. For a complete example, see this post:

References

--

--