How We Handle Our OMS Events With High Availability by Using Kafka Connect

Ahmet Ata
Trendyol Tech
Published in
7 min readJul 17, 2023

Hello everyone. First and foremost, allow me to introduce my team. The Order Meal Team is dedicated to seamlessly integrating meal orders and ensuring the timely delivery of customers’ requests. Our responsibilities encompass coordinating both customer and vendor orders. Given the need for swift action within this domain, our applications must possess considerable strength, exhibit fault tolerance, and have the ability to undertake remedial measures when necessary. Failure to address issues promptly in this field can result in diminished customer satisfaction and potential financial losses. Therefore, we must prioritize the implementation of robust applications.

Let me explain Kafka Connect in simple words.

Kafka Connect is a tool for scalability and reliably streaming data between Apache Kafka® and other data systems. It makes it simple to quickly define connectors that move large data sets into and out of Kafka.

There are two types of Kafka connectors.

  • Sink Connector: It allows you to export data from Kafka topics to any data system. (Couchbase, ElasticSearch, Mysql, vs)
  • Source Connector: It allows you to import data to Kafka topics from any data system. (Couchbase, ElasticSearch, Mysql, vs)

After a brief description, let’s start our journey with Kafka Connect :)

Our Order Management System (OMS) domain is built upon the Couchbase data system and employs an eventing system with Kafka. We utilize the Source Kafka Connect to generate Kafka events from our Couchbase model. This design choice has enabled us to effectively implement the outbox pattern and has greatly facilitated the tracking of event history.

In our initial usage experience, we introduced either base data or event data to the order bucket and implemented Kafka Connect as an intermediary between our bucket and Kafka.

SİMPLE DESIGN FOR KAFKA CONNECT USAGE

The simplicity and ease of implementation of this approach are undeniable. However, there are certain issues with this design. Firstly, by storing both events and base data in the same bucket, the size of the bucket’s documents significantly increases. Additionally, when updating the basic properties of a document, the requirement to repeatedly add specific variations of events can become repetitive.

Separating events and orders buckets

Based on the reasons you have just mentioned, we have made the decision to segregate the order base data and order event data within Couchbase. Consequently, we have established a binding between Kafka Connect and the event bucket to ensure that events are appropriately processed and managed.

SEPARATED DESIGN FOR KAFKA CONNECT USAGE

Let me explain this design details. In this design, it is crucial to create an event and order bucket in transactional at the Couchbase part. So we used Couchbase transaction for adding both bucket data in a transactional way in our codes.

fun insert(order: Order) {
try {
transactions.run { ctx ->
ctx.insert(orderCollection, order.id, document)
ctx.insert(eventCollection, order.event.id, order.event.document)
}
} catch (e: TransactionCommitAmbiguous) {
logger.error(
"TransactionCommitAmbiguous and may have succeeded by order id: {} , with error: {}",
order.id,
e.result().log().logs().toString()
)
throw e
} catch (e: TransactionFailed) {
logger.error(
"TransactionFailed by order id: {} , with error: {}",
order.id,
e.result().log().logs().toString()
)
throw e
}
}

Another thing we do is to give the event bucket a specific TTL(time to live) according to our need to hold the data. So that with TTL, we prevent the data size from increasing.

With these changes, not only has our bucket size decreased, but also we can track events historical briefly.

Indeed, it is essential to consider potential incidents that may occur within the Kafka Connect system. Similar to any system, there can be instances of errors arising from various factors such as network problems or other unexpected situations. It is crucial to acknowledge that even a slight delay within the meal domain can lead to order cancellations, resulting in customer dissatisfaction and financial loss. Therefore, it becomes imperative to proactively address and mitigate any potential issues that may arise within the Kafka Connect system to maintain the smooth operation of our services and ensure customer satisfaction.

So in this incident times, we should not make our Kafka Connect as a single of failure part for whole system and we need to take additional actions so that our operation does not stop.

Solution for preventing single failure

The solution you have implemented is relatively straightforward and effective. By manually sending related events without relying on Kafka Connect, you have achieved greater control over the event-handling process. Additionally, by incorporating a toggle in your code base and configuring it accordingly, you have provided the flexibility to enable or disable the immediate production of events based on the synchronization toggle. This approach allows for quick response and mitigation in the event of Kafka Connect-specific errors. By simply modifying the toggle in your code base, you can redirect the event flow as needed. Overall, this solution offers a practical and efficient way to address potential issues with Kafka Connect and ensure the smooth operation of your system.

We use consul to manage configs and toggles. By the way, we do not need restart K8s pods for changing values in toggles.

SOLUTION FOR SINGLE OF FAILURE

Opening synch toggle maybe cause to duplicate domain events but in other part, we have strategies for avoiding duplicate events effects. For the continuity of the operation, we prefer the possibility of producing duplicate events rather than producing no events.

Bonus :) Checker Project

Changing the toggle seems good and easy but it can be tedious sometimes. In case of a Kafka Connect error, the relevant config always needs to be opened. First of all, we have more than one Kafka Connect. Various projects are having the same logic. The second reason is that Kafka Connect can get error at any time, weekends, nights, mornings, etc. And our watchers may not be available at that time. So because of these reasons, we decided to write an external checker project that does our manual work automatically. Our motto as a team; if a program is doing the manual labor, have him do it and he will inform you.

If we put it into simple words, our Kafka Connect checker project should do the following actions.

  • Examine Kafka Connect clusters statuses periodically. (We set this value at 1 per minute.)
  • It tries to restart the related pod on running the Kafka Connect cluster.
  • If it finds a problem, it changes the synch toggle for related projects.
  • It reports to us operations with our Slack channels.

First of all, we should examine Kafka Connect clusters. For this purpose, we use the Kafka Connect client package as you see from this link. Because our project is maven based spring project, we put this dependency to the pom file like that;

<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>3.1.2</version>
</dependency>

This client can easily connect Kafka Connect and get Kafka Connect statuses which is enough for us. The example code is like that;

In the above code, we can get the status of our Kafka Connect connection. We look at these statuses so that you can get a description of what they mean.

UNASSIGNED: The connector/task has not yet been assigned to a worker.

RUNNING: The connector/task is running.

PAUSED: The connector/task has been administratively paused.

FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).

So, in our cases, we are interested in FAILED or UNASSIGNED statuses. Not only do we look at exception statuses, but also we examine related task statuses too. So we wrote a method like that;

Ok, we examine and find statuses that we take action. So what are possible actions about that;

First of all, the first solution is restarting the pod that consists of our Kafka Connect infrastructure. Sometimes restarting the pod solves connecting problem. So we write the K8s pod restart code by using this package dependency. Simple code that restarts K8s deployment is like that;

Another action we implement is changing the synch toggle value on the consul. As I explained before, the synch toggle allows us to sink events and is necessary for the continuation of the operation in case of Kafka Connect problems. For this, we use this maven package to interact with the consul.

In conclusion, the checker also sends messages about a summary of its operation. We achieve it by using Slack hooks. After that, a message with Slack is like that;

An example of Slack message

Conclusion

For the continuation of the operation, persistence, fault tolerance and stability are very important topics in the order meal domain. Kafka Connect is the connect application we apply between our data source and Kafka because it is observable, fast, and is a technology accepted throughout Trendyol. In this article, I tried to summarize the solutions we applied to solve a robust system and single of failure situation.

Stay well.

Ready to take your career to the next level? Join our dynamic team and make a difference at Trendyol.

--

--