How to enable CDC using Debezium Connector in PostgreSQL

PAWAN SHARMA
2 min readOct 23, 2023

--

What is CDC?

CDC is the process to identify the changes on source data that so that the downstream system or target system which consuming this changes of data is always in sync with source.

Different types of CDC mechanism.

There are two of mechanism to capture the changes in CDC.

  1. Push mechanism: Source database/system will push the changes to the target system.
  2. Pull mechanisms: Target will pull the changes of database from source databases.

How to Enable this in PostgreSQL.

In PostgreSQL CDC work on logical replication model. where subscriber will subscribe the data from publisher.

How to Setup Logical Replication.

Publisher

Create table emp( id int, name text);
Insert into emp (id,name) values(101, 'abc');
Create publication my_pub for all tables;

Subscriber

Create the tables structure and subscriber using the above publisher.

Create table emp( id int, name text);
Create Subscription my_sub connection 'host=localhost port=5432 dbname=postgres' publication my_pub;

Now verify the table emp data has been replicated.

Steps to Enable the Postgres Connector to Capture.

  1. Create a Replication user and assign the replication privileges to it.
Create user replication with password 'replication' with superuser;

2. Add the replication user to authenticate using pg_hba.conf file/

host        replication      replication    all   md5 

3. Change the wal_level setting from replica to logical.

Alter system set wal_level='logical';

4. Restart the PostgreSQL Service.

pg_ctl -D /var/postgres/db1 restart

5. Enable Logical replication of all tables using publisher.

Create publication my_pub for all tables; 

6. Create Replication slot.

select * from pg_create_logical_replication_slot('emp_slot');

7. Verify the Replication slot.

Select slot_name,slot_type,active from pg_replication_slots;

8. Grant CONNECT, SELECT Privileges to table or tables which you want to Replicate.

GRANT CONNECT on DATABASE <dbname> to "replication";
GRANT USAGE on SCHEMA "public" to "replication";
GRANT SELECT ON ALL TABLES IN SCHEMA "public" to "replication";
GRANT USAGE,SELECT ON ALL SEQUENCES IN SCHEMA "public" to "replication";

9. Allow replica-identify to full to capture the changes with Update and DELETE transaction on table.

ALTER TABLE <tablename> REPLICA IDENTITY FULL;

10. use the following details to connect using kafka to fetch the changes occurred on source database.

plugin.name:pgoutput

publication.name:my_pub

slot.name:emp_slot

--

--