Change Data Capture Using debezium postgres kafka Connect
What is CDC?
In databases, change data capture (CDC) is a set of software design patterns used to determine (and track) the data that has changed so that action can be taken using the changed data. CDC is also an approach to data integration that is based on the identification, capture and delivery of the changes made to enterprise data sources
How we can achieve CDC pattern using Kafka with PostgreSQL DB, In this article i will go through how to do ?, This below demo is done in window 10
Pre-requisites
- Docker Desktop
- Docker Debezium images (zookeeper, kafka, connect,postgressql)
What we want to achieve in this article ?
In Postgres DB will insert , update and delete the records and will see if kafka is consuming those events or not ? after consuming the events any application can listen those events and process it. but this article just listen the db events to kafka
To use the connector to produce change events for a particular PostgreSQL server or cluster:
- install the logical decoding plugin
- configure the PostgreSQL server to support logical replication
- create a configuration file for the PostgreSQL Connector and use the Kafka Connect REST API to add that connector to your Kafka Connect cluster.
Execute below docker commands sequentially in windows 10 command prompt
//network to allign all container with the same network
docker network create kafka-net
//zookeeper container
docker run -d — name zookeeper — net kafka-net -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.0
//kafka container
docker run -d — name kafka — net kafka-net -p 9092:9092 — link zookeeper:zookeeper -e ZOOKEEPER_CONNECT=zookeeper:2181 debezium/kafka:1.0
//kafka connect container
docker run -d — name connect — net kafka-net -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3 -d’/’ | cut -f1 -d’:’) -e BOOTSTRAP_SERVERS=kafka:9092 — link zookeeper:zookeeper — link kafka:kafka debezium/connect:1.0
// postgres container
docker run -d — name postgres — net kafka-net debezium/postgres:10-alpine
Once done with docker run for above container check if all container are running fine
docker ps
Now we need to do the “Installing the Logical Decoding Output Plug-in” In postgres container
docker exec -it postgres
cd /var/lib/postgresql/data
In the above path we can find all the below files which is necessary to change the configuration , if its not proper , if its there already then ignore the changes
postgresql.conf
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json'
tells the server that it should load at startup the decoderbufs
and wal2json
logical decoding plugins (the names of the plugins are set in Protobuf and wal2json Makefiles)
Next is to configure the replication slot regardless of the decoder being used:
postgresql.conf
# REPLICATION
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1
tells the server that it should use logical decoding with the write-ahead logtells the server that it should use a maximum of 1
separate processes for processing WAL changestells the server that it should allow a maximum of 1
replication slots to be created for streaming WAL changes
Note: The above changes i have not done since its there in my container file
Setting up Permissions
Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts.
In order to give a user replication permissions, define a PostgreSQL role that has at least the REPLICATION
and LOGIN
permissions. For example:
CREATE ROLE cdc REPLICATION LOGIN;
Superusers have by default both of the above roles.
Finally, configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running:
pg_hba.conf
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust
tells the server to allow replication for <youruser>
locally (i.e. on the server machine)tells the server to allow <youruser>
on localhost
to receive replication changes using IPV4
tells the server to allow <youruser>
on localhost
to receive replication changes using IPV6
Example Configuration
Using the PostgreSQL connector is straightforward. Here is an example of the configuration for a PostgreSQL connector that monitors a PostgreSQL server at port 5432 on <postgres_container_ip_add>
create below database and tables in postgres container
create database inventory;
grant all privileges on database inventory to postgres;
CREATE TABLE customers (
id SERIAL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
POST : http://localhost:8083/connectors with below json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.99.100",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "inventory",
"table.whitelist": "public.customer"
}
}
- The name of our connector when we register it with a Kafka Connect service.
- The name of this PostgreSQL connector class.
- The address of the PostgreSQL server.
- The port number of the PostgreSQL server.
- The name of the PostgreSQL user that has the required privileges.
- The password for the PostgreSQL user that has the required privileges.
- The name of the PostgreSQL database to connect to
- The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
- A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
Note : "database.hostname": give postgres container ip address you can get it from docker inspect postgres
After hitting above POST API you should get 200 oka with response body
you can test with below GET API , it will return connector name and RUNNING STATUS
GET: http://localhost:8083/connectors/inventory-connector/status
Now Test the Data insert/update/delete the records from table, execute the sql scripts in postgres container and check in kafka receiving the events for tables
Thats all from my side :)
Reference Documents:
https://docs.confluent.io/current/connect/references/restapi.html