Debezium Custom Converters

Oryan Moshe
7 min readMay 26, 2020

Creating custom converters using Debezium’s new SPI to override value conversions

Introduction

This is cross-posted from DEV

Background about the TimestampConverter

Hey, my name is Oryan Moshe, I started my own community for senior developers in Israel named in.dev.
I’m also a Lead Architect at Rivery, we create a solution for Data Pipelines as a Service, and as a part of my job here I needed to develop a feature which gives our clients the ability to stream changes straight from their databases, to us, using CDC.

It’s important to note, the last time I coded in Java was 7 years ago, so if you have any suggestions to improve the code shown here please feel free to comment down below!

You can find the converter right here: https://github.com/oryanmoshe/debezium-timestamp-converter/

CDC — Change Data Capture

Before we talk about Debezium, we have to talk about CDC, or Change Data Capture.

CDC is a way for us to get the changes happening in the database (as opposed to the actual data)
This means we can actually get every state that every record has been through in the database.

CDC can be useful for a number of cases:

  • Keeping a log of record changes
  • Undoing (or reverting) a change
  • Knowing about record deletion (which is not easily possible by just SELECTing it)

What is Debezium anyway?

So Debezium is an open source platform, maintained by Red Hat, that allows us to implement change data capture into our Kafka infrastructure.
It does so by easily configuring connections using any of the provided Kafka Connect connectors (currently there’s support for MySQL, PostgreSQL, Microsoft SQL Server, MongoDB, and even some limited support for Oracle)

What are converters, and why would we need a custom one?

All messages produced by Debezium are processed before entering the designated topic, to make sure all fields of a given type (defined by the schema) behave the same.
This means all DATE fields, on all of the databases will be transformed into the same format, which is by default "Days since epoch".
This behavior isn't always wanted, especially in this temporal example.

For our use case we needed all of the temporal fields, whether their type is DATE, DATETIME, DATETIME2, TIME or TIMESTAMP, to be in the same format.
The format we chose was YYYY-MM-dd'T'HH:mm:ss.SSS'Z'.

Creating a custom converter

In this section I’ll go through an explanation each step I did to create our TimestampConverter.

The basics of custom converters

To allow such behavior, on version 1.1 of Debezium, the developers introduced the Debezium SPI (Service Provider Interface).
This allowed users to create their own converters (using Java), by creating a class that implements the io.debezium.spi.converter.CustomConverter interface.

The first gotcha

What I didn’t know when I started developing this converter, is that once I register a custom converter to a temporal column, Debezium’s behavior becomes sporadic. Sometimes it’ll pass a DATE column as "Days since epoch", as expected, but sometimes it'll pass it as a string, matching the date format of the database it came from.

This meant I had to have all of my bases covered, both for when I get a numeric value (let’s say, “Days since epoch”) and both for all date format databases can throw at me (YYYY-MM-dd, dd/MM/YYYY, YYYY-MMM-dd, etc.)

Things got a bit complicated on the logic front, but I won’t get into this now.

What do we need for our custom converter to work

To be used by Debezium, each converter has to implement at least two methods:

configure

This method runs when the connector is initialized. It accepts one argument:

props An object of type java.util.Properties, containing all of the properties we passed our converter instance.

converterFor

This method runs once for each column defined in our schema, and it’s job is to define (a.k.a “register”) the converter for each of them. It accepts two arguments:

column An object of type io.debezium.spi.converter.RelationalColumn, containing the definition of the column we're currently handling, including it's name, type, size, table, etc.

registration An object of type io.debezium.spi.converter.CustomConverter.ConverterRegistration (an internal definition) that has one method, register.

Using the configure method

As stated above, we use the configure method to pass properties into our converter, this is important because we can use the same converter for multiple connectors, and change its behavior according to this properties.

For our TimestampConverter we wanted to pass 4 properties:

  • debug – Indicates whether we should print debug messages. Defaults to false.
  • format.date – The format to which we'd like to convert all columns of type DATE. Defaults to YYYY-MM-dd.
  • format.time – The format to which we'd like to convert all columns of type TIME. Defaults to HH:mm:ss.
  • format.datetime – The format to which we'd like to convert all other temporal columns. Defaults to YYYY-MM-dd'T'HH:mm:ss.SSS'Z'.

All of these properties are optional and have default values associated with them.
To support them we defined each of them as a class property with the default value, and inside the configure method we assigned them with the passed value:

public class TimestampConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final String DEFAULT_DATE_FORMAT = "YYYY-MM-dd";
public static final String DEFAULT_TIME_FORMAT = "HH:mm:ss.SSS";

public String strDatetimeFormat, strDateFormat, strTimeFormat;
public Boolean debug;

private SimpleDateFormat simpleDatetimeFormatter, simpleDateFormatter, simpleTimeFormatter;

@Override
public void configure(Properties props) {
this.strDatetimeFormat = props.getProperty("format.datetime", DEFAULT_DATETIME_FORMAT);
this.simpleDatetimeFormatter = new SimpleDateFormat(this.strDatetimeFormat);

this.strDateFormat = props.getProperty("format.date", DEFAULT_DATE_FORMAT);
this.simpleDateFormatter = new SimpleDateFormat(this.strDateFormat);

this.strTimeFormat = props.getProperty("format.time", DEFAULT_TIME_FORMAT);
this.simpleTimeFormatter = new SimpleDateFormat(this.strTimeFormat);

this.debug = props.getProperty("debug", "false").equals("true");

this.simpleDatetimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
this.simpleTimeFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
}
}

Using the converterFor method

Now this is the money time, we need to convert each column to it’s respective format.

First of all, we have to understand what is the type of the column we’re currently handling, this can be determined using column.typeName.
If the type is any of the temporal types known to us (defined as a class constant) we handle it accordingly, if it's not however – we do nothing. We let Debezium take control.

To tell Debezium to convert a specific column to something else, we need to use the registration passed to us, and then register it, providing a schema (I just created one of type string and made it optional) and a converter.

The converter is just a function, or in our case a lambda, that receives an Object, which is the source value, and returns a value matching the schema we provided. In our case, we needed to return a String (or null, because we made it optional).

@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
if (SUPPORTED_DATA_TYPES.stream().anyMatch(s -> s.toLowerCase().equals(column.typeName().toLowerCase()))) {
boolean isTime = "time".equals(column.typeName().toLowerCase());
registration.register(datetimeSchema, rawValue -> {
if (rawValue == null)
return rawValue;

Long millis = getMillis(rawValue.toString(), isTime);

Instant instant = Instant.ofEpochMilli(millis);
Date dateObject = Date.from(instant);

switch (column.typeName().toLowerCase()) {
case "time":
return simpleTimeFormatter.format(dateObject);
case "date":
return simpleDateFormatter.format(dateObject);
default:
return simpleDatetimeFormatter.format(dateObject);
}
});
}
}

In this code snippet you can see the two crucial parts I have mentioned before — the call to registration.register, and the return statements.

Using a Custom Converter with Debezium

Installation

Installation in your Debezium cluster is pretty straight forward, you just need to add the .jar file of the converter to the connector you want to use it in.

The second gotcha

Notice I said “ … to the connecter you want … “, this is a thing Debezium didn’t make clear in the documentation. You need to add this converter to every connector you want to use it in.
Let’s say your base folder for connectors is /kafka/connect, then inside you'll find folders like debezium-connector-mysql, or debezium-connector-postgres.

You need to add the converter .jar file to each of those folders if you intend to use it.

Configuration

After adding the .jar file to your connector, you can configure your connectors to use it!

To do so all you need to do is add the following keys to your existing configuration:

"converters": "timestampConverter",
"timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter"

If you need to customize the formats to which specific data types are converted to you can use these additional configuration keys:

"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"

Conclusions

The addition of an SPI to Debezium brought a lot to the table in term of custom converters.
This allows us to get a tailored fit CDC connector, with the data streaming into our Kafka cluster exactly in the format we want it to.

I didn’t include here the actual logic, converting the values from their raw format into the epoch time (this part is contained in the getMillis method)
But I have published the TimestampConverter as open source, so you can read the code there, use the converter in your own application (be it as a .jar file which you can find in the releases section, or as a dependency you can find in the packages section), or contribute to it's development!

Feel free to suggest contributions to this converter, and share with me what kind of converters you created using the new Debezium SPI, and which ones you wish were made!

Links

To read more about Debezium Custom Converter visit their official documentation:
https://debezium.io/documentation/reference/1.1/development/converters.html

Link to the repository of my TimestampConverter: https://github.com/oryanmoshe/debezium-timestamp-converter

--

--