Case Study: Kafka Connect & ksqlDB

Nur Erkartal
Trendyol Tech
Published in
3 min readDec 20, 2022

In my previous article, the Outbox pattern is described, and I shared our experiences about why we use it. Now let’s move one step further and apply the outbox pattern using Kafka Connect. Kindly check the link to find more information about Kafka Connect.

Firstly, we need to define a Source Connector to receive at least one event of the latest version of each document. Since we use Couchbase as persistence, we should define Couchbase Source Connector.

Let’s assume there is a scenario below

  • We store loyalty customer’s information in the member-collection
  • member-collection belongs to loyalty-scope and loyalty-scope belongs to airline-bucket.
  • Credentials for the Couchbase: Username: test, Password:1234
  • IP for the Couchbase is 127.0.0.1
  • We would like to receive events from member-events.0 topic
{
"name": "student-source-connector",
"config": {
"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"couchbase.seed.nodes": "127.0.0.1",
"couchbase.bootstrap.timeout": "30s",
"couchbase.bucket": "airline-bucket",
"couchbase.username": "test",
"couchbase.password": "1234",
"couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
"couchbase.event.filter": "com.couchbase.connect.kafka.filter.AllPassFilter",
"couchbase.collections": "loyalty-scope.member-collection",
"couchbase.topic": "member.events.0",
"couchbase.compression": "ENABLED",
"couchbase.persistence.polling.interval": "100ms",
"couchbase.flow.control.buffer": "32m"
}
}

Sample configuration for the Kafka Connect

Please feel free to read the below links for detailed explanations of Source Configuration parameters

Now we are all set to receive Kafka event on every change of documents that is in the member-collection.

I assume a question just popped up in your head 😂 Can we filter the Kafka events and create new events based on the filter condition?

Imagine that an airline company, would like give a welcome bonus to each new customer. How can you identify new members?

There are so many options, of course. In this section, I will show how to do this by using ksqlDB.

Let’s check our data model in Couchbase:

type Member struct {
Id string
Name string
Surname string
Status string
Level string
Activities []Activity
CreatedAt time.Time
UpdatedAt time.Time
}

type Activity struct {
Point int
Type string
Description string
Date time.Time
}

Then create a stream using the Kafka event model. (i)Please be aware that our Kafka event model is the same as our data model

CREATE STREAM MEMBER_EVENTS_STREAM (
`id` VARCHAR KEY,
`name` VARCHAR,
`surname` VARCHAR,
`status` VARCHAR,
`level` VARCHAR,
`targets` ARRAY < STRUCT < `point` INTEGER, `type` VARCHAR, `description` VARCHAR`, date` VARCHAR >>,
`createdAt` VARCHAR,
`updatedAt` VARCHAR
) WITH (
KAFKA_TOPIC = 'member.events',
VALUE_FORMAT = 'JSON'
);

We can filter Kafka events and also publish filtered events to another topic by using the below ksql code.

CREATE STREAM NEW_MEMBER_STREAM WITH (
KAFKA_TOPIC = 'new.members',
VALUE_FORMAT = 'JSON',
PARTITIONS = 3,
REPLICAS = 2
) AS
SELECT *
FROM
STREAM MEMBER_EVENTS_STREAM
WHERE `status` = 'New'
PARTITION BY `id`
EMIT CHANGES;

Whenever there is a new member created, the status is set as New. Then we get an event from new.members topic for new members whose status is New.

Challenges

Since Kafka-connect will create an event for every change in the data model, you will receive the latest version of the document. In case you would like to identify which operation is completed or which fields are updated, you may need to store that information in your data model. As a solution, any field such as “delta” can be used to identify what was modified or what was the operation.

Summary

I demonstrate how to use Kafka Connect and ksqlDB for our case. Kafka Connect and ksqlDB are compelling technologies. If you want to have guaranteed “at least once” Kafka message delivery of the latest version of each document in your system, Kafka Connect might be one of the best solutions.

If you need to filter your event or want to customize your event based on your business cases, ksqlDB might be one of the best options.

--

--