How to Set up CDC with Kafka, Debezium and Postgres ?

Prasoon Parashar
7 min readJul 1, 2023

--

In this blog, I will guide you through the process of setting up Change Data Capture (CDC) with Kafka and Postgres on Google Cloud. CDC allows you to capture real-time data changes from your Postgres database and stream them to Kafka topics for further processing. By following these step-by-step instructions, you’ll have a fully functional CDC system up and running in no time.

Streaming data going to Kafka Topic

To complement this written guide, we have also created a detailed YouTube video where you can follow along with the instructions visually. Make sure to check it out here.

Additionally, we have provided a comprehensive Readme file that includes all the necessary code snippets, configurations, and resources mentioned in this tutorial. You can find the Readme file here. It serves as a handy reference for implementing the steps outlined in this guide.
Happy Reading !!

Step 1:
Database Setup on Google Cloud:
To begin, sign up for Google Cloud, which offers a free $300 credit for new users. Follow these steps to set up your Postgres database:

  1. Search for ‘Postgres’ in the Google Cloud search bar and select ‘SQL.’
  2. If you don’t have an existing instance, click on ‘Create Instance’ and choose ‘PostgreSQL’ as the database engine.
  3. Provide a unique instance ID and set a password for your database.
  4. Select the environment as ‘Development’ and keep the default region and zone settings.
  5. Under ‘Flags,’ set ‘cloudsql.logical_decoding’ to ‘ON’ by clicking on ‘New Database Flags.’
  6. Click ‘Create’ and wait a few minutes for the instance to be created.
Set ‘cloudsql.logical_decoding’ to ‘ON’

Step 2:
Whitelisting Your IP: To access your Google Cloud Postgres instance from your local environment, follow these steps to whitelist your IP:

  1. Search ‘What’s my IP’ on Google to find your public IP address.
  2. Go back to Google Cloud, click on ‘Connections,’ and navigate to the ‘Networking’ section.
  3. Under ‘Authorized networks,’ click on ‘Add network.’
  4. Give the new network a name (e.g., ‘connect-from-local’) and enter your public IP in CIDR format (e.g. if your IP is 171.76.86.35 change it to 171.76.86.0/24).
  5. Save the configuration, and wait for it to apply.

Step 3:
Creating User and Password: Now, let’s create a user and password for the database to establish the connection:

  1. Click on ‘Users’ in the Google Cloud console
  2. Locate the username ‘postgres’ and click on the three dots to access the options.
  3. Choose ‘Change password’ and set your desired password (e.g., ‘1234’).
  4. Remember the password as we will need it for the connection later.

Step 4:
Set Up pgAdmin: Establishing a Connection to your Database:

  1. Visit the pgAdmin website and download the appropriate version for your operating system.
  2. Install pgAdmin and open it.
  3. Click on ‘Register Server’ to set up the connection.
  4. Give a name to your server (e.g., ‘my-new-gcloud-server’).
  5. In the ‘Connection’ section, enter the following details:
  • Hostname: Copy the ‘Public IP address’ from your Google Cloud Postgres instance’s ‘Overview’ section.
  • Username: ‘postgres’
  • Password: Use the password you set earlier.

6. Click ‘Save’ to establish the connection to the database.

Step 5:
Creating the Database and Tables

  1. In pgAdmin, navigate to the ‘Databases’ section and refresh it.
  2. Create a new database called ‘PINNACLEDB.’ and grant REPLICATION access to ‘postgres’ user
    (Note: Granting replication access is mandatory step otherwise CDC data won’t follow to kafka Topic)
create DATABASE PINNACLEDB;
ALTER USER postgres WITH REPLICATION;

3. Within the ‘PINNACLEDB’ database, create a schema.

create SCHEMA ECART;

4. Copy and execute the provided SQL script to create all the required tables.

create table ECART.CUSTOMER (
CUSTID VARCHAR(50),
CUSTNAME VARCHAR(100),
CUSTADD VARCHAR(400)
);

create table ECART.PRODUCTINFO (
PRODUCTID INTEGER,
PRODUCTNAME VARCHAR(150),
PRODCAT VARCHAR(400),
STOREID varchar(70)
);

create table ECART.STOREINFO (
STOREID varchar(70),
STORENAME VARCHAR(150),
STOREADD VARCHAR(400)
);

create table ECART.FACT_ORDER (
ORDERID SERIAL PRIMARY key,
CUSTID VARCHAR(50),
PRODUCTID INTEGER,
PURCHASETIMESTAMP TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

Step 6:
Making Code Changes and Data Generation:

To generate data and populate the database, let’s make some code changes:

  1. Update the ‘hostname’ in the code to your Google Cloud Postgres public IP.
  2. Modify the ‘user’ and ‘password’ variables to match the username and password you set earlier.
  3. Clone this repository
git clone https://github.com/Noosarpparashar/startupv2.git

4. Open the ‘dataGenerator.py’ file located in the ‘python/dataGenerator/ecart_v2’ directory.

5. Adjust the speed of data insertion as desired by changing this variable.

6. Update the connection details: change the ‘host’ to your Google Cloud Postgres public IP, keep ‘user’ as ‘postgres,’ and set the ‘password’ to the one you set earlier.

7. Save the file.

8. Dockerize the Python code by building the Docker image. Run the following command:

docker build -t my-ecart-data-generator .

Step 7:
Running the Kafka-Kraft (Kafka without zookeeper), Debezium, Kafka UI and Schema registry with Docker Compose

version: '3'
services:
kafka1:
image: confluentinc/cp-kafka
container_name: kafka1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka1:29092,EXTERNAL://localhost:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'p8fFEbKGQ22B6M_Da_vCBw'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

kafka2:
image: confluentinc/cp-kafka
container_name: kafka2
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka2:29092,CONTROLLER://kafka2:29093,EXTERNAL://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka2:29092,EXTERNAL://localhost:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'p8fFEbKGQ22B6M_Da_vCBw'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

kafka3:
image: confluentinc/cp-kafka
container_name: kafka3
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka3:29092,CONTROLLER://kafka3:29093,EXTERNAL://0.0.0.0:9094'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka3:29092,EXTERNAL://localhost:9094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'p8fFEbKGQ22B6M_Da_vCBw'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080 # Changed to avoid port clash with akhq
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
- connect
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083
DYNAMIC_CONFIG_ENABLED: 'true'

schema-registry:
image: confluentinc/cp-schema-registry
container_name: schema-registry
hostname: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
depends_on:
- kafka1
- kafka2
- kafka3
connect:
image: quay.io/debezium/connect:latest
ports:
- 8083:8083
depends_on:
- kafka1
- kafka2
- kafka3

environment:
- BOOTSTRAP_SERVERS=kafka1:29092,kafka2:29092,kafka3:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses


networks:
default:
name: my-network
external: true

The provided docker-compose file sets up a multi-container environment for a Change Data Capture (CDC) setup using Postgres and Kafka. It includes the following services:

  • Three Kafka brokers (`kafka1`, `kafka2`, `kafka3`) are set up using the `confluentinc/cp-kafka` image, each with a different port and configured with replication factors and listeners.
    - The `kafka-ui` service hosts the Kafka UI, allowing you to manage Kafka topics and messages.
    - The `schema-registry` service sets up the Confluent Schema Registry for managing Avro schemas and compatibility.
    - The `connect` service represents Debezium Kafka Connect, responsible for capturing and streaming changes from Postgres to Kafka.
  1. Switch to the root user in the terminal and run ‘docker-compose up’ inside the ‘infra’ folder.
  2. By running `docker-compose up`, you can start the containers and create a CDC environment to capture real-time data changes from Postgres and stream them to Kafka topics.
cd startupv2/infra
docker-compose up

3. Open another terminal and run ‘docker ps’ to confirm if the services are running successfully.

4. Access the Kafka UI by opening ‘localhost:8080’ in your browser to visualize Kafka topics, brokers, clusters, and connectors.

Step 8:
Data Generation and Kafka Connection

  1. Switch to the ‘ecart_v2’ directory in the terminal and use the ‘docker run’ command to start the data generator.
docker run my-ecart-data-generator

Step 9:
Create connector and send data to Kafka Topic

1. Open a tool like Postman or cURL to make a POST request to the Kafka by setting up the endpoint URL to http://localhost:8083/connectors
2.Replace the request body parameters with the appropriate values and send the POST Request:

  • <name>:Give any suitable name to your connector.
  • <hostname>: Specify the public IP of your Postgres instance.
  • <topic.prefix>: Specify any Kafka topic name
  • <database.password>: Specify the password you set for the user
  • <topic.prefix>: Specify any Kafka topic name
{
"name": "inventory-connector1",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "34.012.345.678",
"database.port": "5432",
"database.user": "postgres",
"database.password": "1234",
"database.dbname": "pinnacledb",
"database.server.name": "fullfillment",
"table.whitelist": "ECART.STOREINFO",
"topic.prefix": "mysecondtopic"
}
}

3. To verify the connector, open Kafka UI in your browser and navigate to the Kafka Connect section.

4 .In the Kafka Connect section, you will see the newly created connectors listed. Click on the relevant connector, such as “inventory-connector1,” to view its details

5.Explore the created Kafka topics and click on any of them to access the “Messages” section

That’s it! You have successfully sent the generated data to a Kafka topic using Debezium Kafka Connect.

We have covered the step-by-step process of setting up a Change Data Capture (CDC) system with Postgres and Kafka on Google Cloud. By following these instructions, you can now capture real-time data changes from your Postgres database and stream them to Kafka topics for further processing. In the next tutorial, we will explore how to consume these messages. Stay tuned for more exciting content! If you have any questions or need assistance, please leave a comment below. Thank you for reading and don’t forget to clap and follow !!

--

--

Prasoon Parashar

"Data Engineer specializing in cloud tech and big data. Passionate about creating innovative, scalable solutions. Lifelong learner. #DataScience #Cloud"