Event sourcing with Kafka Connect: inconsistency pitfalls & solutions

Gabriel Barreras
adidoescode
Published in
11 min readFeb 3, 2023

Introduction

Nowadays, events are the in the middle of everything.

Every system that needs to be scalable, real-time and microservices-oriented, needs to have an event-driven architecture. We must get rid of our old PL/SQL procedures, not only because they are slow and inefficient, but mostly because we want to decouple different systems and applications.

You have probably heard about event sourcing pattern, this is a common pattern when a service needs to send an event every time it writes into the database. Then, any external service can simply subscribe to those events following Publish-subscribe pattern and be aware of every change.

However, supporting event sourcing pattern is not free. Certainly, you can modify all your applications writing to the database so they send an event to the message broker every time they commit an update to the database. Unfortunately, that’s not always a quick option when you have multiple applications performing writes into the database. There are some consistency issues behind it as well: What happens if you already committed the record in the database but something fails during event production? Or maybe the opposite scenario, you send the event but then something fails when trying to commit to the database.

The purpose of this article is to describe how we implemented event sourcing with Kafka Connect along with an inconsistency we faced in production. We will analyze the problem in deep and show how we solved it.

Picture of 夜 咔罗 in Unsplash

Creating the solution

We decided to implement event sourcing pattern by using Kafka Connect. It is a tool used to integrate Apache Kafka platform with different data sources. We use Oracle 19c as the database platform and Apache Kafka as the event streaming platform. Kafka Connect allows us to transform every update to Oracle database in a Kafka event. For that we can use a type of connector called Source connector.

Image source: https://www.confluent.io/es-es/blog/kafka-connect-deep-dive-jdbc-source-connector/

How it works

Source connector uses jdbc driver to perform SQL queries against the database. As a developer, you just need to write the SQL query based on your data model and configure some parameters. Kafka Connect will do the rest.

This is an example of a Kafka Connect configuration:

{
"name": "kc-consistency-test-incrementing",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.password": "<database password>",
"batch.max.rows": "50",
"connection.user": "<database user>",
"connection.url": "<database url>",
"tasks.max": "2",
"mode": "incrementing",
"incrementing.column.name": "PK_ID",
"topic.prefix": "kc_consistency_test_incrementing",
"poll.interval.ms": "1000",
"validate.non.null": "false",
"query": "SELECT PK_ID, UPDATED_DT FROM EXAMPLE_TABLE",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
}
}

Note that we are using incrementing mode using column PK_ID. Let’s say our database table contains ten records with ids from 0 to 9. When this Kafka Connect instance is created, it will run the query and fetch all 10 records, and it will keep track of the highest value for PK_ID, which is 9. Then, it will continue running this query every second (based on poll.interval.ms value) but filtering out any record where PK_ID <= 9. This is something Kafka Connect does in a transparent way. Then it makes sure all the records are sent to Kafka and there are no duplicates.

What about the updates?

Assuming PK_ID is the primary key of our table, that will work fine for insertions, but what about the updates? Updating a primary key column is not allowed in most of the database engines, since it would be a bad practice. Then, if we want to send Kafka events not only when a record is inserted but also when it is updated, we need to configure Kafka Connect to use timestamp mode. Using it, it will detect new and modified rows in the database, as long as the value represents the date when the record was inserted or updated.

Then the Kafka Connect instance would be configured like this:

{
"name": "kc-consistency-test-timestamp",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.password": "<database password>",
"batch.max.rows": "50",
"connection.user": "<database user>",
"connection.url": "<database url>",
"tasks.max": "2",
"mode": "timestamp",
"timestamp.column.name": "UPDATED_DT",
"topic.prefix": "kc_consistency_test_incrementing",
"poll.interval.ms": "1000",
"validate.non.null": "false",
"query": "SELECT ID, UPDATED_DT FROM EXAMPLE_TABLE",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
}
}

Please note that UPDATED_DT would need to be updated every time the record is modified in the database.

Source connector modes

There are three different modes you can use:

  • Incrementing mode: It uses an incremental column to track the rows it has already read. Supports event sourcing for database insertions and it’s very convenient when the table has a primary key.
  • Timestamp mode: It uses a timestamp/date column. Supports event sourcing for database insertions and updates.
  • Timestamp+incrementing mode: Uses two columns. One timestamp column for insertions and updates and another incrementing column which provides a globally unique ID for updates.

For instance, with timestamp mode, if the developer writes:

SELECT ID, UPDATED_DT FROM EXAMPLE_TABLE;

Kafka Connect would actually run the following query:

SELECT ID, UPDATED_DT FROM EXAMPLE_TABLE
WHERE CREATED_DATE > :latestUpdatedDateSoFar;

As you can see, the responsibility of the developer is just to write the business-related query. Then, Kafka Connect will take care of identifying which new records have been introduced and send them to a Kafka topic.

As simple as that…right?

Facing Reality

Great, we have successfully implemented event sourcing pattern without adding a single line of code in our existent applications and we deployed it to production. But one day, one of the teams consuming our data reaches us complaining about we are losing events. This is, some of the database inserts and updates are not being sent to Kafka. How is that possible?

For this example, let’s use the following Kafka Connect configuration and database entity:


"name": "kc-consistency-test-timestamp-incrementing",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.password": "<database password>",
"batch.max.rows": "50",
"connection.user": "<database user>",
"connection.url": "<database url>",
"tasks.max": "2",
"mode": "timestamp+incrementing",
"incrementing.column.name": "KC_TEST_SOURCE_ID",
"timestamp.column.name": "CREATED_DT",
"topic.prefix": "kc_consistency_test",
"poll.interval.ms": "1000",
"validate.non.null": "false",
"query": "SELECT CAST(KC_TEST_SOURCE_ID AS NUMBER(10, 0)) AS KC_TEST_SOURCE_ID, EXEC_START_TIME, RANDOM_STRING, CREATED_DT, ENTITY_CREATION_DT FROM KC_TEST_SOURCE",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
}
}
Sample data model record
  • KC_TEST_SOURCE entity is inserted and never updated.
  • We are using timestamp+incrementing mode with KC_TEST_SOURCE_ID and CREATED_DT columns.
  • KC_TEST_SOURCE_ID is the primary key of the table.
  • CREATED_DT represents the timestamp when the record was inserted by Oracle.

The technical explanation of the issue is described in the following diagram:

Kafka Connect losing messages timeline diagram

Let’s see what happened step by step:

  1. Transaction 1 (TX 1) starts and inserts an element with id 10 and CREATED_DT=00:000. This is 0 seconds and 0 milliseconds. For some reason the transaction is not committed immediately.
  2. Transaction 2 (TX 2) starts later and inserts an element with id 11 and CREATED_DT=00:020. The transaction is committed 20 milliseconds later, at 00:040.
  3. Kafka Connect runs the query and fetches record with id 11, but it does not see the record with id 10, since it is not yet committed. Last updated timestamp is now 00:050, and Kafka Connect will assume nothing could have been inserted before that time.
  4. Transaction 1 is finally committed at 00:060.
  5. Kafka Connect runs the query again at 00:120 but it misses the record with id 10, because CREATED_DT=00:000 is earlier than 00:050.

What’s the problem?

Inserting (or updating) a record in the database and committing are two different operations which do not happen atomically. At least that is the case for Oracle 19c. This means that after the write operation and before the commit, any transaction can be committed in between.

UPDATE EXAMPLE_TABLE
SET UPDATED_DT = SYSDATE
WHERE PK_ID = :idToUpdate;
-- here, any other transaction may be committed
COMMIT;

You could think of some manual locking (for instance using a trigger), but that would cause huge performance issues.

The most surprising thing is that Confluent documentation does not mention anything about this race condition. Especially after seeing that this issue was reported on 2016: https://github.com/confluentinc/kafka-connect-jdbc/issues/172

Reproducing the issue

Ok, so in theory the race condition exists, but how probable is to run into that? Can we reproduce it easily?

Environment setup

In order to reproduce the issue we used:

  • Oracle 19c database (AWS RDS, 32 GB RAM, 8 vCPU, General Purpose SSD (gp2))
  • Kafka broker 7.1.1-ccs
  • Kafka Connect cluster 5.5.1-ccs
  • Lenses, for visualising the kafka messages.
  • Java application to simulate massive insertions into the database.
  • Kubernetes cluster for running multiple instances of our Java application.

We used our development environment for these tests, taking advantage of the fact that we already have all this platform stack up and running.

Java application

The application is quite simple. We used a simple Spring Boot application which we deployed as a Kubernetes job.

@Service
@Slf4j
public class DataLoaderService {

private ApplicationContext context;
private DatabaseWriterService databaseWriterService;
@Value(value = "${rows.to.insert}")
private Integer numRowsToInsert;
@Value(value = "${spring.datasource.url}")
private String datasourceUrl;
@Value(value = "${spring.datasource.username}")
private String username;
@Value(value = "${max.queue.size}")
private Integer maxQueueSize;

@Autowired
public DataLoaderService(ApplicationContext context,
DatabaseWriterService databaseWriterService) {
this.context = context;
this.databaseWriterService = databaseWriterService;
}

@EventListener(ApplicationReadyEvent.class)
public void insertData() {
Date executionStartingTime = new Date();
List<CompletableFuture<KcTestSourceEntity>> completableFutureList =
new ArrayList<>(maxQueueSize);

IntStream.range(0, this.numRowsToInsert).forEach((int i) ->
{
try {
completableFutureList.add(this.insertRow(executionStartingTime));
} catch (InterruptedException e) {
log.error("Thread interrupted", e);
}
}
);

CompletableFuture.allOf(completableFutureList.toArray(
new CompletableFuture[numRowsToInsert])).join();

closeApplication();
}

private CompletableFuture<KcTestSourceEntity> insertRow(
Date executionStartingTime) throws InterruptedException {
KcTestSourceEntity kcTestSourceEntity = new KcTestSourceEntity(
executionStartingTime);
return databaseWriterService.insertEntity(kcTestSourceEntity);
}

protected synchronized void closeApplication() {
SpringApplication.exit(context);
}
}
@Service
@Slf4j
public class DatabaseWriterService {

private static final Random RANDOM_GEN = new Random(System.nanoTime());
private static int MAX_DELAY_MS = 100;
private KcTestSourceRepository kcTestSourceRepository;

public DatabaseWriterService(KcTestSourceRepository kcTestSourceRepository) {
this.kcTestSourceRepository = kcTestSourceRepository;
}

@Async("threadPoolExecutor")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public CompletableFuture<KcTestSourceEntity> insertEntity(
KcTestSourceEntity kcTestSourceEntity) throws InterruptedException {
KcTestSourceEntity storedKcTestSourceEntity = this.kcTestSourceRepository
.save(kcTestSourceEntity);
log.debug("Saved KcTestSourceEntity with id {}, Thread id: {}",
storedKcTestSourceEntity.getKcTestSourceId(),
Thread.currentThread().getId());
// random delay after entity creation
Thread.sleep(RANDOM_GEN.nextInt(MAX_DELAY_MS));
return CompletableFuture.completedFuture(storedKcTestSourceEntity);
}
}

These are the two relevant classes. I’m omitting main class, as well as JPA Repository and Entity classes because they are not relevant.

Please note that insertEntity method is introducing a random delay on purpose, in order to simulate concurrent writes in the database.

The application is then containerized using Docker and deployed in the development Kubernetes cluster as a Kubernetes job. That way we can control how many parallel instances of the job we want to run. In this first experiment we used a single application instance, this is, we ran the job in Kubernetes with parallelism value being one. However, ThreadPoolTaskExecutor is configured to use 50 threads.

Lost Kafka messages experiment results

As you can see, it is quite easy to reproduce the issue. I tried all three different Kafka Connect modes, and the result is very similar for all of them.

Is there any solution?

After some research we found a few people asking about this issue. One of them is in Kafka Connect jdbc Github, previously referenced: https://github.com/confluentinc/kafka-connect-jdbc/issues/172

There some people is suggesting to use a parameter in Kafka Connect called timestamp.delay.interval.ms. According to Confluent documentation the behaviour of this parameter is the following:

How long to wait after a row with certain timestamp appears before we include it in the result. You may choose to add some delay to allow transactions with earlier timestamp to complete. The first execution will fetch all available records (i.e. starting at timestamp 0) until current time minus the delay. Every following execution will get data from the last time we fetched until current time minus the delay.

- Type: long
- Default: 0
- Importance: high

But how can this parameter help in our case? Is it guaranteed to not lose messages anymore?

The key point here is the relation between the maximum duration of a write transaction (or transaction timeout) and the value of timestamp.delay.interval.ms parameter. Let’s look at the diagram again and then we will analyze each of the steps:

Kafka Connect not losing messages timeline diagram

Step by step explanation:

  1. Transaction 1 (TX 1) starts and inserts an element with id 10 and CREATED_DT=00:000 but it doesn’t commit it immediately.
  2. Transaction 2 (TX 2) starts later and inserts an element with id 11 and CREATED_DT=00:020. The transaction is committed 20 milliseconds later, at 00:040.
  3. Kafka Connect runs the query and does not fetch any record. As per the documentation says, even if the record is committed into the database, it will not fetch it unless timestamp.delay.interval.ms milliseconds (150 ms in the example) have passed since the record was written (not committed).
  4. Transaction 1 is finally committed at 00:060.
  5. Kafka Connect runs the query at 00:120 and no records are fetched, for the same reason.
  6. Kafka Connect runs the query at 00:180 and both records are fetched, since they were both written at least 150 ms ago.

What is this delay doing?

When Kafka Connect “sees” a new insert or update, we are telling him to wait until we are sure any transaction which may have happened concurrently it’s already finished.

How to make sure there is no long-running transaction being higher than timestamp.delay.interval.ms?

One easy way it to use a transaction timeout for all those queries writing into the tables watched by Kafka Connect. If we assure timestamp.delay.interval.ms value is higher than transaction timeout, we will be safe.

Testing the solution

Now let’s test the new configuration by introducing another scale of concurrency. In order to do that, we are now running 5 and 10 Kubernetes jobs in parallel, each of them using 50 threads.

Results after solving the issue

As you can see, we are keeping a generous margin between database transaction timeout and timestamp.delay.interval.ms. Probably setting a value of 30 seconds in timestamp.delay.interval.ms would have been enough, but we didn’t want to risk or ruin experiment results due to machines clock desynchronization or any external factor.

Conclusions

  • Kafka Connect source connector provides an easy way of translating database inserts and updates into Kafka events, supporting different database engines. It uses JDBC source connector, which implements query-based CDC (change data capture). Basically, it works by continuously capturing “snapshots” of the database.
  • It may be a convenient solution as long as you can afford some delay between writing into the database and the publishing of the Kafka message. Then, timestamp.delay.interval.ms parameter can be tuned together with database transaction timeout for write operations to guaranty there is no messages loss.
  • However, if you need to produce fast and consistently, then DO NOT USE Kafka Connect JDBC source connector. Instead, you can use an alternative such as Debezium or use a database engine which supports CDC natively, such as AWS DynamoDB.

The views, thoughts, and opinions expressed in the text belong solely to the author, and do not represent the opinion, strategy or goals of the author’s employer, organization, committee or any other group or individual.

--

--