Debezium with Single Message Transformation (SMT)

Okan YILDIRIM
Dec 7, 2020 · 7 min read

Hello everyone! In this article, with my teammate Betül Çetinkaya, we are going to write about why we need to use Debezium, what kind of problems we have faced, and how we solved these problems as the Trendyol Seller Finance Team. After a short introduction part and explaining how we benefited from some of the simple but effective features of Debezium by tuning connector configuration, we will specifically focus on SMT we use for message filtering.

Introduction

If you are unfamiliar with terms like Transactional Log Tailing, Change Data Capture (CDC) pattern or you do not know about how to install Debezium and create connector on it, we highly recommend that you read Transaction Log Tailing With Debezium series of posts by our teammate Abdullah YILDIRIM. Because we will assume that you know these topics and how to create a simple connector on Debezium.

As a Seller Finance Team, we are providing solutions to every financial issue of sellers such as settlements, invoices, receipts, payment transferring. Moreover, we facilitate the daily work of the finance department.

We can say that our core business based on settlement domain object. In the simplest manner, we can define settlement as the unit financial record in a seller’s sale. Our settlement records are found in PostgreSQL database. Settlement has relationships with other tables.

Our business is generally processing asynchronously so that we are building our microservices based on event-driven architecture. We prefer to use Kafka as a message broker and we use it frequently in our projects.

We have a business that we must send a message to other applications when there is a specific change on the settlement record. We had been managing this business in the application itself, but we come across some transaction problems, although rare. We already have experience with Kafka. Thus, Debezium is a perfect tool for us. We already use Debezium in production to implement CDC pattern. However, the case described below, enable us to change direction to different features of Debezium.

We started to encounter performance problems while generating reporting documents, with the increase in the number of sellers and orders. Because these documents contain other columns from other tables that have a relation with settlement table. We decided to migrate settlement table and other related tables -just needed columns- to another database as a single row. Thanks to Debezium, we not only completed the migration but also continue to feed the second database. In this way, we will dispose of join queries and generate document files faster.

System Design

We assume that you have a running Debezium. It is required to send a post request to the running Debezium in order to create a connector. For creating connectors, our base configuration is given below. If you are not familiar with this configuration you can read the documentation of Debezium.
With this base configuration, you are able to create a connector called settlement-connector, that captures every change on settlements table and send all these changes as message to settlement-debezium.public.settlements named Kafka topic.

Base Connector Configuration

Now, we will tune the configurations according to our needs.

By default, when there is a change on a row, Debezium sends all columns in the message. However, we don’t need every column in the message. For example, created_date and status columns are not important for our case and we want to send messages without them. In this case, we add column.blacklist to our configuration:

------
"column.blacklist":
"public.settlements.created_date, public.settlements.status"
------

In addition, when delete operation Debezium generates a Kafka record called tombstone message with a value of null. We also may not want tombstone records.

Using the connector option tombstones.on.delete you can control whether upon record deletions a tombstone event should be emitted or not.

-------
“tombstones.on.delete”: “false”
-------

In our case, we need the initial snapshot for past values in the database. But we were interested in only a subset of the records to migrate. Debezium lets us specify a SELECT statement that filters a specific subset of snapshot. When the connector performs a snapshot, it executes this SELECT statement to retrieve specific data from that table.

According to the Debezium documentation you can specify which table rows are included in snapshots by using snapshot.select.statement.overrides configuration as databaseName.tableName. But be careful at this point, for PostgreSQL, Debezium expects you to define this parameter as schemaName.tableName.

---------
"snapshot.select.statement.overrides":"public.settlements", "snapshot.select.statement.overrides.public.settlements":
"SELECT * FROM public.settlements WHERE order_date > '2020-11-06'"
---------

In many cases, we might be interested in only a subset of the events produced by Debezium. It would be a special business requirement or we might not want to consume unnecessary messages. Debezium delivers every data change event message by default. We could have filtered messages we were interested in on the consumer side. But in our case, the table we wanted to produce has a jsonb column that stores an array of ids with approximately 1–1.5 MB data. When there is an update operation, it generates approximately 3 MB payload with after and before part of the message. Because of the huge payload, we want to produce events with special changes instead of every message. For such problems Debezium provides the single message transforms that use transformation API in Kafka Connect. Confluent defines SMT as:

SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka.

There are different types of transforms such as filter, reroute, unwrap, outbox, etc. but we will use filter type in our case. The filter SMT supports scripting languages that integrate with JSR 223. The filter SMT and any implementation of the JSR 223 API are not included in the Debezium by default. So we need to add SMT and JSR 223 API implementation by following steps in message filtering debezium documentation.

Firstly, we need to download debezium-scripting-1.4.0.Alpha2.tar.gz for SMT. And also need an implementation for JSR 223 API. In our case, we used Groovy as an expression language. You can also use JavaScript or Javascript with Graal.js.

When we get these jar files, we need to extract them into the Debezium plug-in directories of our Kafka Connect environment. For PostgreSQL connector, extract them into ‘/kafka/connect/debezium-connector-postgres/’.

connect:
container_name: debezium
image: debezium/connect:1.3
ports:
- 8083:8083
links:
- kafka
- postgresql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- CONFIG_STORAGE_TOPIC=debezium_connect_config
- OFFSET_STORAGE_TOPIC=debezium_connect_offset
- STATUS_STORAGE_TOPIC=debezium_connect_status
- GROUP_ID=settlement-debezium-connect
- CONNECT_PRODUCER_MAX_REQUEST_SIZE=3145728
volumes:
- $PWD/jars:/kafka/connect/debezium-connector-postgres

After your Debezium container is ready, you can check whether jars files moved correctly.

Now that we have added plugins we are ready to use filtering. First of all, we need to configure filter SMT in the connector with the configurations given below.

---------
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter"
---------

We also need to specify the regular expression language by configuration given below.

---------
"transforms.filter.language": "jsr223.groovy"
---------

Now we can define our filter conditions by using transforms.filter.condition. We can filter messages by operations and fields. In our example, we need an event when there is an update operation on sent field from false to true.

---------
"transforms.filter.condition": "value.op == 'u' && value.before.sent == false && value.after.sent == true"
---------

When we had completed the configuration we faced a problem that said the value has not ‘op’ field. Then we realized these messages come from Debezium configuration topics. The expression evaluated for every message in the connector including our Debezium config, offset, and status topics. To resolve this issue we need to specify a regular expression with transforms.filter.topic.regex that evaluates the name of the destination topic for an event to determine whether to apply to filter logic. In this way, SMT ignores the event that comes from Debezium configuration topics and filters only messages belong to the target topic.

---------
"transforms.filter.topic.regex":
"settlement-debezium.public.settlements"
---------

Our final curl is given below:

We have mentioned the size of our event message approximately 3 MB. But in kafka connect producer configuration the maximum size of a request is 1048576 byte by default. So we need to change max.request.size of producer config.

Environment variables that start with CONNECT_ will be used to update the Kafka Connect worker configuration file. To update kafka connect configurations we can use CONNECT_PRODUCER_MAX_REQUEST_SIZE as an environment variable.

BOOTSTRAP_SERVERS:”localhost:9092”
CONFIG_STORAGE_TOPIC: “settlement-debezium-config”
OFFSET_STORAGE_TOPIC: “settlement-debezium-offset”
STATUS_STORAGE_TOPIC: “settlement-debezium-status”
GROUP_ID: “settlement-debezium-connect”
CONNECT_PRODUCER_MAX_REQUEST_SIZE: ‘3145728’

It allows you to produce a message but it is not enough. Your topic max.message.bytes configuration must be tuned. You can create a topic as follows:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 12 --topic settlement-debezium.public.settlements --config max.message.bytes=3000000

Lastly, on the consumer side, don’t forget to specify configurations such as

  • FETCH_MAX_BYTES_CONFIG
  • MAX_PARTITION_FETCH_BYTES_CONFIG
  • MAX_POLL_RECORDS_CONFIG

that allows you to consume these big messages. Moreover, beware of giving

  • SESSION_TIMEOUT_MS_CONFIG
  • MAX_POLL_INTERVAL_MS_CONFIG

because processing these messages may cost too much time.

In this article, we just want to share our experiences on Debezium, what kind of problems we faced, and how we solved these problems by tuning connector configurations. In this way we filter:

  • Unnecessary Columns
  • Tombstone Messages
  • Initial Snapshot
  • CDC Messages

and handle huge payloads.

If you want to use up-and-coming technologies such as Debezium on production, contribute these works, and develop highly scalable applications join to Trendyol family.

Trendyol Tech

Trendyol Tech Team

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store