How We Extended Couchbase Kafka Connector And Deploy It To K8S? Log Tailing Outbox Pattern

Eren Arslan
Trendyol Tech
Published in
7 min readApr 19, 2022

In Trendyol, We use Couchbase in many places and ways. Couchbase is a memory-first NoSql database. It allows us to work at multi-data center with its multi-leader structure in an environment where scale is very important.

Kafka is the most popular distributed event streaming platform. We use it in many places and ways in Trendyol like Couchbase.

Our Requirements

We needed an API that is responsible for managing our Order Management business. And We designed an API using DDD. Domain events are so important for us. We need to produce events to notify related domains. We have to be sure all events are produced to Kafka. That’s why We decided to use Outbox Pattern. We chose Couchbase as a database. We use documents per Aggregate Root. When We create a domain event, We need to write it to the database in the same transaction with actual data. It’s so important. Couchbase support atomicity for a single document. We decided to write the events in the same document with Aggregate Root.

And We need to produce these events to Kafka. But We don’t want to create Kafka dependency at our API. We decided to apply Log Tailing Outbox Pattern. All We need to do is listen to CDC events from Couchbase and produce these events to Kafka. And Here It is;

Couchbase Kafka Source Connector

Kafka Connect is a framework to stream data into and out of Kafka with connectors. Connectors might be Source (Get from somewhere to Kafka) or Sink (Get from Kafka to somewhere).

The source connector guarantees “at least one” delivery

In Trendyol, We use Confluent Kafka. There are many types of connectors. And most of them are open-source. We can quickly do that from Confluent UI if we want to run a connector. All we need to do is a few clicks.

Connector produces these events to a topic. An Application listens to this topic. And dispatches them to actual topics in the payload.

Problems

As I said. We can create a connector quickly from UI. But as GlobalPlatforms team, We had some concerns and special requirements.

1) We want to get all logs to a common logging place like Kibana.

2) We want to monitor health and set some alerts.

3) We should be able to scale whenever we want.

4) There are some special requirements we want.

  • Observability, Monitoring, Logging
  • Missing Delete Events. There is no payload at the DELETION CDC event
  • Filtering. Ignore Document events that have empty Domain Event field
  • Unnecessary Payload. Only events Field is enough for us.
  • Custom Routing
  • Key And Header Customization

That’s why We wanted to deploy it to our K8S like a standard application. We realize that We can create a docker container from confluent Kafka connect base image. And just install Couchbase Kafka Connector from confluent-hub.

It is just 2 lines. And We can deploy to K8S. And it’s the same thing as making from Web UI. But This time It works at our K8S.

We use Distributed Workers for the connector. That’s why Connector holds the information it needs in Kafka Topics like Offset information, Config information, Status information, etc. We need to pass these things to the container as an environment variable with deployment or configMap resource. And done.

After that, All We need to do is create connector with sending a POST request via connector REST API

But there are still some problems.

Maybe We can collect logs from the pod to our Kibana But We can’t still monitor very well. Maybe just pod resources. And We have special requirements like;

  1. We don’t want the whole payload. We just need events field.
  2. We should ignore CDC events When the aggregate doesn’t have any events.
  3. We want to customize Event Key and Headers.
  4. We need custom routing. We can set only one topic to the connector for producing events. But We want to send events to different topics conditionally.

5. There is no payload at the DELETION CDC event. We need deletion event and send it to the deletion topic.

Our Solution

After some research, We discovered that We can extend Couchbase Kafka Connector. It’s open-source. And there is even a sample project. As They said. All We need to do is ;

  1. download connector distribution.
  2. download Kafka.
  3. move the files from couchbase-kafka-connect-couchbase-<version>/lib to kafka_<version>/libs directory.

Now We can create a dockerfile. And add the Kafka directory to the container.

The connector can expose metrics via JMX. You can find details here. We can collect metrics with the Prometheus JMX exporter. There is already an example prometheus_config.yml for Kafka Connect.

We should download jmx_prometheus_javaagent-0.16.1.jar and example prometheus_config.yml. And add them to the container.

Finally, We have the Couchbase connector image again. It is the same thing with the Web UI Connector and described connector above. We created it in 3 ways. Now We can move logs to our Kibana. We can also get metrics from connectors like task status, rebalancing, etc. We already have pod metrics too. The connector is observable anymore for us.

But not enough…. We still could not handle our special needs.

Solution; Custom Source Handler

As You see here. You can optionally extend the connector’s functionality by writing custom components. There is an example.

I mentioned above that we need to post the config for creating Connector. There is a field whose name is couchbase.source.handler. The value of this field is a package path. So If We write our custom handler. We can handle our special requirements.

All we need to do is create a meaven project and add DCP dependency

<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>kafka-connect-couchbase</artifactId>
<version><! — connector version →</version>
</dependency>

We need to create a class that extends from RawJsonSourceHandler. It comes from the DCP library. We already use it as a standard source handler When creating Connector via REST API.

Some methods need to be overridden.

Firstly Message comes to handle method. We pass it to the filter to check events field is not empty or DELETION. After that, We build the message payload. We only want the events field. Or Custom Delete Payload. And send this message to the topic we want with the key we want. We can override it conditionally with getTopic method. If the CDC event is DELETION. We send it to the deletion topic with Custom Payload.

We handled all our requirements anymore. We should compile this class and put the created jar file in the same directory with the connector files.

We created a dockerfile with Kafka and connector files above. In the end, We have a command to run. We should delete this line. Because this image will be our base image.

CMD [“connect-distributed.sh”,”./configs/connect-distributed.properties”]

We should create another dockerfile for extended Couchbase Connector. as I said We should put our custom handler jar in the same directory with connector files.

And done.

After all of them We can post our config with our custom handler;

“couchbase.source.handler”: “main.com.trendyol.custom.connector.CustomConenctorHandler”,

We can monitor tasks from the connector, set alerts, and trace logs from Kibana anymore.

Delete Event was like this. There is no payload, different Key values, and the same topic as normal events.

But after We extended Connector, We sent delete events to another topic with a custom payload and key. We can change the payload and key as we want.

Summary

We need to create an API with DDD for order Order business. We had to be sure from domain events published to Kafka. We decided to use Log Tailing Outbox Pattern. The solution was a couchbase Kafka connector for us. We could set up a connector from Confluent UI quickly.

But We had some problems

  • Observability, Monitoring, Logging
  • Missing Delete Events
  • Filtering. Ignore Document events that have empty Domain Event field
  • Unnecessary Payload.
  • Custom Routing
  • Key And Header Customization

We solved them all by extending Couchbase Kafka Connector for our custom requirements.

✔️ Filtered by whether or not it has Domain Events

✔️ Deleted Unnecessary Payload. Just Domain Event payload.

✔️ Changed Key or Headers as We want.

✔️ We sent Deletion Events to another topic. We can send events to the subject we want, in the payload we want, according to our conditions.

✔️ We can monitor Connector Tasks and Workers healthy, Pod Resources, and set alerts.

✔️ We collect logs from the pod console output to our Kibana.

--

--