CDC setup for Postgres using Debezium and Kafka Connect
As I explored how to set up a change data capture from Postgres to Kafka using Debezium and Kafka connect, I wanted to share the setup process as a guide for folks who are interested.
This GitHub repository contains the configuration if you want to jump ahead and run the setup.
There are 2 steps for the setup:
- Postgresql Image (with output plug-in)
- Kafka Connect - Debezium Image
Postgresql Image
This image contains Postgres installed with the implementation of output plug-in (either wal2json or decodebufs or pgoutput). This output plug-in is an implementation that extracts changes committed to the transaction log. You can spend some time creating an image on your own or use an existing image from debezium oss project.
debezium/postgres:13
FROM debezium/postgres:13COPY postgresql.conf.sample /usr/share/postgresql/postgresql.confCOPY start-up.sql /docker-entrypoint-initdb.d
This is a simple appendage to the image from debezium. We are simply adding a configuration file to use the output plug-in and a start-up SQL script with DDL.
The start-up SQL script creates a sample customers table.
Kafka Connect Debezium Image
Next, we will create a Kafka connect image with the addition of a debezium-postgres-connector on top of the base image.
Along with these custom images, we will use confluent Kafka, zookeeper images to spin up a single node Kafka cluster, and Kafka manager for managing Kafka. The below file structure shows the Dockerfile(s) for the custom images and a docker-compose file.
Start the set-up by simply running the command below:
docker-compose up
This will spin up all the necessary containers. You can launch and add Kafka cluster in Kafka Manager to verify the containers are properly spun.
Create the Debezium Kafka Connector by using the Kafka connect endpoint. Use the below cURL command as a reference to create the connector.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "postgres-dbz-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "dbzpostgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "DebPostgres1!",
"database.dbname" : "postgres",
"table.include.list": "public.customers, public.orders",
"topic.prefix": "dbz-",
"tasks.max": "1",
"database.server.name": "debezium-demo"
}
}'
You will see the response below when you try to create the connector
{
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "postgres",
"database.hostname": "dbzpostgres",
"database.password": "DebPostgres1!",
"database.port": "5432",
"database.server.name": "debezium-demo",
"database.user": "postgres",
"name": "postgres-dbz-connector",
"table.include.list": "public.customers, public.orders",
"tasks.max": "1",
"topic.prefix": "dbz-"
},
"name": "postgres-dbz-connector",
"tasks": [],
"type": "source"
}
This will launch a Kafka connect instance running to capture events from the Postgres instance mentioned in the config from tables listed.
The event is a JSON with changed data, source details such as transaction id, transaction time, etc, and schema of the table and source. You can find the JSON file in the Github repository.
There is a field to indicate the operation executed on the row as in if it is a create, update and delete.