Scale Your Database with Kafka Connect? Part 1

itwasneo
8 min readSep 5, 2022
Photo by David Clode on Unsplash

If I had a dollar for every time I complain about the legacy database the company has been using being the bottleneck of the whole system… The aim of this project is to show a practical way to scale the persistence layer of a system with kafka-connect. As “micro-services architectures” rise with their impeccable glory, the amount of time spent on the cable has shared the same glory with its creator. Consecutively, applications’ being close to the data has become more and more significant. In-memory for the rescue right?

The things that I implement for this project may not be suitable for certain cases, and by “certain” I mean “most” of the enterprise solutions, but they were fun to implement anyway.

Although the Connect API was introduced at Kafka 0.9 release back in 2015, I think it is a really cool piece of technology that can be utilized in different forms. Before going into more detail about my project let me talk a little bit about kafka-connect and leave some links to their official documentations, because surely their explanation will be better than mine.

What is kafka-connect?

In essence, kafka-connect is a “connector” tool between Apache Kafka and other data systems. It can ingest databases to Kafka topics as a “source” connector or as a “sink” connector it can feed other data systems such as Elasticsearch. You can think of it as an ETL tool with low latency, high scalability and simple extensibility, so basically you can’t think of it as an ETL tool.

Here is the official documents by Confluent for more details. And of course an introductory video for the visual learners.

Project Recipe

First let me give you the github repository for this project.

What I wanted to achieve with this project is basically to implement a pipeline between a relational database system and a REST application. It will be a step by step tutorial for each implementation, because although there are official documentations for every technical complexity in web, you deeply know that they won’t ever work at the first try. Here are the components:

  • postgres: Persistence layer.
  • kafka: Connector will ingest data from postgres into Kafka topics.
  • kafka-connect: PostgreSQL Source connector by Debezium. (List of self-managed connectors for kafka-connect.)
  • crypto-aggr: Aggregator java application (using javalin framework) that will populate the database.
  • crypto-connect: Another java application (using javalin framework) that will ingest a Kafka topic created by kafka-connect and create its own in-memory database.

Other than these base components, there are several other instances that have to be run alongside the rest in order the whole cluster to be working properly. At the end, all the components will be run with a single docker-compose file, but first we will write a proper docker-compose file just for kafka-connect and postgres.

Proper docker-compose File for kafka-connect and postgres That Works

It will get longer with the 2 additional applications, but for now I will explain each container in this docker-compose file.

  • db: As I mentioned before, I will use postgres as the main database. Default username for postgres image is postgres. In order not to lose the data at each execution I will assign a volume under volumes section. That will create a data folder in the current directory and persist data. Another important point to mention is the wal_level setting command. kafka-connect listens (write-ahead logging) to detect any data change in database tables and using those log entries creates topic messages for individual topics. For more detail about the wal_level settings for postgres.
  • zookeeper: This is a coordination service that will be used to manage kafka. (Synchronization and group services.)
  • kafka: This is the container that will run the kafka broker. Basic settings are applied through environment variables. KAFKA_AUTO_CREATE_TOPICS_ENABLE variable should be set to true for automatic topic creation by kafka-connect.
  • schema-registry: Schema API that will be utilized by kafka-connect as well as our kafka client inside crypto-connect javalin app. Basic functionality of this service is to be a serving layer for our data schemas. For more detail about schema registry.
  • kafka-connect: And finally, our queen, khaleesi, mother of change-sets… In order to run this container properly, I had to try lots of different settings and modifications, but most of these environment variables can be found in the official documentations. As a Key Converter I used basic String converter, but to represent values I choose the AVRO format, because why not. Another important thing to mention is the plugin installations. As I mentioned earlier, you need proper connectors for different data systems (in this case postgres). With the command section, we install the proper plugin (debezium connector) when the container boot up (packages that are installed via confluent-hub command will go to the /usr/share/confluent-hub-components directory.). CONNECT_PLUGIN_PATH should be also indicated with the proper directories delimited by comma.
  • ksqldb: We will use ksqldb just as an observer to our kafka topics. Normally ksqldb is a much more powerful tool for stream processing applications. For more information about ksqldb.

You can find this docker-compose file with the name “docker-compose-crypto-utils.yml” in the repository as well. Let’s run it.

> docker-compose --file docker-compose-crypto-utils.yml up -d

Creating a Database and a Table in postgres Using psql

First let’s connect to our postgres database and create a table. You can attach to the postgres container and start a psql session using our default user postgres with the following docker command. (For further detail about psql)

> docker exec -it postgres psql -U postgres

You can list the existing databases with the following command which for now only lists the default databases.

> \l

In order to create a database to work with execute the following. This will create a database named pg_dev whose owner is our default postgres user.

> CREATE DATABASE pg_dev OWNER postgres;

Now let’s connect to our newly created database and create a table called mini_ticker with the corresponding fields. (I will explain the mini_ticker model in the following sections. For now, I just wanted our messaging and persistence layers to be fully available, before getting into the development of our web apps)

> \c pg_dev> CREATE TABLE mini_ticker (id serial PRIMARY KEY, current_time_millis BIGINT, epoch_time BIGINT, pair VARCHAR(10), close VARCHAR(20), open VARCHAR(20), volume VARCHAR(20));

After successfully running the create script, you should see your table when you execute the following command.

> \dt

And finally let’s insert a dummy row.

> INSERT INTO mini_ticker(current_time_millis, epoch_time, pair, close, open, volume) VALUES (9999, 9999, 'BTCUSDT', '10000', '10001', '10002');

Connecting kafka-connect to our database

Now that we have a proper database and a table, we can connect our kafka-connect instance to our database and start to listen change events. But before doing that let’s first check current state in our kafka instance through ksqldb. We can attach to ksqldb container and run ksql CLI tool with the following docker command.

> docker exec -it ksqldb ksql http://localhost:8088

Inside the CLI, in order to see the current topics in our kafka instance we can run the following:

> show topics;

It should return something like this:

Kafka Topic                    | Partitions | Partition Replicas------------------------------------------------------------------crypto-ksqlksql_processing_log | 1          | 1docker-connect-configs         | 1          | 1docker-connect-offsets         | 25         | 1docker-connect-status          | 5          | 1------------------------------------------------------------------

These are the default topics that are created by kafka-connect and ksqldb instances.

Now let’s exit from ksql CLI tool with exit; command and connect our kafka-connect instance to our database.

curl for kafka-connect with debezium postgres connector

kafka-connect has an API that we can communicate through HTTP requests, so in order create a connection between kafka-connect and postgres database we can simply run the following curl command in our terminal.

Inside the config tag, you can see the necessary configuration for the kafka-connect source connector. You can see the details of the configurations at the [debezium’s official documentations].

After executing POST curl, we can check if we successfully create a connector with the following curl command.

> curl -s "http://localhost:8083/connectors?expand=info&expand=status"

It should return something like the following:

{"pg-connector":{"info":{"name":"pg-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.user":"postgres","database.dbname":"pg_dev","transforms.unwrap.delete.handling.mode":"rewrite","topic.creation.default.partitions":"1","slot.name":"dbname_debezium","publication.name":"dbname_publication","transforms":"unwrap","database.server.name":"pg_dev","heartbeat.interval.ms":"5000","plugin.name":"pgoutput","database.port":"5432","database.hostname":"postgres","database.password":"example","poll.interval.ms":"1","transforms.unwrap.drop.tombstones":"true","topic.creation.default.replication.factor":"1","name":"pg-connector","transforms.unwrap.add.fields":"source.ts_ms","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","table.include.list":"public.(.*)"},"tasks":[{"connector":"pg-connector","task":0}],"type":"source"},"status":{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"source"}}}%

We can see that we have a connector called pg-connector and its state is RUNNING. GREAT!!!

Now let’s check the current state of our kafka instance. Connect to ksqldb container and run show topics; command again.

Kafka Topic                    | Partitions | Partition Replicas------------------------------------------------------------------crypto-ksqlksql_processing_log | 1          | 1docker-connect-configs         | 1          | 1docker-connect-offsets         | 25         | 1docker-connect-status          | 5          | 1pg_dev.public.mini_ticker      | 1          | 1------------------------------------------------------------------

Wow, we have a new topic with the name of our new table. Let’s check the events inside this topic with the following command.

PRINT 'pg_dev.public.mini_ticker' FROM BEGINNING;

You should see something like the following.

Key format: HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRINGValue format: AVROrowtime: 2022/08/08 17:33:13.066 Z, key: [1400140405@7166488599636816253/-], value: {"id": 1, "current_time_millis": 9999, "epoch_time": 9999, "pair": "BTCUSDT", "close": "10000", "open": "10001", "volume": "10002", "__source_ts_ms": 1659979992956, "__deleted": "false"}, partition: 0

This event is actually our first INSERT operation to the mini_ticker table.

Let’s insert another row and see the change inside the kafka topic.

Key format: HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRINGValue format: AVROrowtime: 2022/08/08 17:33:13.066 Z, key: [1400140405@7166488599636816253/-], value: {"id": 1, "current_time_millis": 9999, "epoch_time": 9999, "pair": "BTCUSDT", "close": "10000", "open": "10001", "volume": "10002", "__source_ts_ms": 1659979992956, "__deleted": "false"}, partition: 0rowtime: 2022/08/08 17:54:01.767 Z, key: [1400140405@7166488599636816509/-], value: {"id": 2, "current_time_millis": 9998, "epoch_time": 9998, "pair": "ETHUSDT", "close": "12345", "open": "12345", "volume": "12345", "__source_ts_ms": 1659981241755, "__deleted": "false"}, partition: 0

As we can see, each INSERT operation creates a new entry inside the topic. What about the DELETE you may ask. Let’s delete the first row.

DELETE FROM mini_ticker WHERE ID = 1;

After the DELETE operation our kafka topic looks like the following.

Key format: HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRINGValue format: AVROrowtime: 2022/08/08 17:33:13.066 Z, key: [1400140405@7166488599636816253/-], value: {"id": 1, "current_time_millis": 9999, "epoch_time": 9999, "pair": "BTCUSDT", "close": "10000", "open": "10001", "volume": "10002", "__source_ts_ms": 1659979992956, "__deleted": "false"}, partition: 0rowtime: 2022/08/08 17:54:01.767 Z, key: [1400140405@7166488599636816509/-], value: {"id": 2, "current_time_millis": 9998, "epoch_time": 9998, "pair": "ETHUSDT", "close": "12345", "open": "12345", "volume": "12345", "__source_ts_ms": 1659981241755, "__deleted": "false"}, partition: 0rowtime: 2022/08/08 17:56:58.509 Z, key: [1400140405@7166488599636816253/-], value: {"id": 1, "current_time_millis": null, "epoch_time": null, "pair": null, "close": null, "open": null, "volume": null, "__source_ts_ms": 1659981418499, "__deleted": "true"}, partition: 0

As we can see, we have another event with true __deleted field in the message queue. By default, debezium connector removes the records for DELETE operations from the event stream but the way we configure the connector prevents this behavior. (When we create the connector with the first curl command we set transforms.unwrap.delete.handling.mode to rewrite) This way we have a single source of truth that we can use to replicate our database.

Now that we have a working kafka-connect setup, in the second part we will write the two “micro-services”.

--

--