Near Real Time Data Replication using Debezium

Part 2 : A detailed guide to build a data replication pipeline from GCP CloudSQL Postgres to GCP BigQuery

Saurabh Gupta
Google Cloud - Community
8 min readFeb 11, 2022

--

In Part 1 of this series we implemented Debezium in a distributed mode.In Part 2 we will install and configure source and sink connectors to replicate data from CloudSQL Postgres to BigQuery.

What is CloudSQL Postgres ?

PostgreSQL is the industry’s leading open source relational database with an active and growing ecosystem of developers and tools. With Cloud SQL for PostgreSQL, you can now spend less time on your PostgreSQL database operations, and more time on your applications.

CloudSQL Postgres features:

Full compatibility for your PostgreSQL applications on major versions: 14, 13, 12, 11, 10, 9.6
Support for the most popular PostgreSQL extensions and over 100 database flags
Integrated database observability for DevOps with Cloud SQL Insights
Integration with key services like Google Kubernetes Engine (GKE), BigQuery, and Cloud Run
Simplified and secure migrations with Database Migration Service

What is BigQuery ?

BigQuery is a serverless, cost-effective and multicloud data warehouse designed to help you turn big data into valuable business insights.

BigQuery features:

Democratize insights with a secure and scalable platform with built-in machine learning
Power business decisions from data across clouds with a flexible, multicloud analytics solution
Run analytics at scale with 26%–34% lower three-year TCO than cloud data warehouse alternatives
Adapting to your data at any scale, from bytes to petabytes, with zero operational overhead

Why is it needed to replicate data from OLTP databases like Postgres to DWH like BigQuery ?

The operational data needs of an organization are addressed by the online transaction processing (OLTP) systems which is important to the day-to-day running of its business. Nevertheless, they are not perfectly suitable for sustaining decision-support queries or business questions that managers normally needs to address. Such questions involve analytics including
aggregation, drilldown, and slicing/dicing of data, which are best supported by online analytical processing (OLAP) systems. Data warehouses like BigQuery support OLAP applications by storing and maintaining data in multidimensional format. Data in an OLAP warehouse is extracted and loaded from multiple OLTP data sources (including Postgres, MySQL,DB2, Oracle, SQL Server and flat files).

Architecture:

Requirements:

CloudSQL Postgres Instance

BigQuery Instance

Debezium Service [as detailed in Part 1]

Connector Installation

Execute the below installation steps on all 3 Debezium nodes:

Install Source Connector : Debezium Postgres to Kafka Connector

confluent-hub install debezium/debezium-connector-postgres:latest

Install Sink Connector : Kafka to BigQuery Connector

confluent-hub install wepay/kafka-connect-bigquery:latest

Connector Configuration

Source connector configuration — CloudSQL Postgres

cat ~/source.json

{
“name”: “sourcecloudsqlpg”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.hostname”: “XX.XX.XX.XX”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “****”,
“database.dbname”: “dvdrental”,
“database.server.name”:”my-server”,
“transforms”: “route”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”:”([^.]+)\\.([^.]+)\\.([^.]+)”,
“transforms.route.replacement”:”$3",
“plugin.name”: “wal2json”,
“transforms.unwrap.add.source.fields”: “ts_ms”,
“tombstones.on.delete”: “false”,
“include.schema.changes”: “true”,
“inconsistent.schema.handling.mode”:”warn”,
“database.history.skip.unparseable.ddl”: “true”,
“table.whitelist”: “public.accounts”,
“snapshot.mode”: “initial”,
“snapshot.locking.mode”: “none”,
“time.precision.mode”: “connect”,
“decimal.handling.mode”: “double”
}
}

Target connector configuration — BigQuery

cat ~/target.json

{
“name”: “targetbigquery”,
“config”: {
“connector.class”: “com.wepay.kafka.connect.bigquery.BigQuerySinkConnector”,
“tasks.max” : “1”,
“errors.log.enable”: true,
“errors.log.include.messages”: true,
“topics” : “accounts”,
“sanitizeTopics” : “true”,
“autoCreateTables” : “true”,
“autoUpdateSchemas” : “true”,
“bufferSize”: “1000”,
“maxWriteSize”:”100",
“tableWriteWait”: “10”,
“project” : “<GCP-PROJECT-ID>”,
“defaultDataset” : “debezium”,
“threadPoolSize”: 10,
“keyfile” : “<absolute path of service account key file>”,
“upsertEnabled”: true,
“bigQueryPartitionDecorator”: false,
“kafkaKeyFieldName”: “kafkakey”,
“transforms” : “HoistFieldKey”,
“transforms.HoistFieldKey.type”: “org.apache.kafka.connect.transforms.HoistField$Key”,
“transforms.HoistFieldKey.field”: “user_id”,
“allowNewBigQueryFields”: “true”,
“allowBigQueryRequiredFieldRelaxation”: “true”,
“schemaRetriever” : “com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“allBQFieldsNullable”: true
}
}

BQ Service Account and key creation steps:

Download and paste the key as defined in the BigQuery sink connector configuration file keyfile.

Connector Registration:

Register and start Source CloudSQL PG connector:

curl -l -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d @source.json

Above command will register and start the source CloudSQL PG connector. Once the connector starts up it connects with the source i.e CloudSQL PG and take snapshot of the whitelisted table defined in the connector config file and push all the records in the kafka topic with the same name as database table name. As per the above config file , snapshot of accounts table is taken and all 58 records are pushed to kafka topic with name accounts. Streaming of changed records automatically starts once the snapshot is completed.

Snapshot of debezium logs

List kafka topic and make sure a kafka topic with same name as postgres table being replicated is created.

Kafka topic with the same name as Postgres table name is created

List content of kafka topic and make sure the records are streamed to the kafka topic. No. of records in the kafka topic should be equal to no. of records in the postgres table.

Register and start BigQuery sink connector:

curl -l -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d @target.json

Above command will register and start the sink BigQuery connector. Once the connector starts up it connects with the kafka schema registry and fetches the schema of the source table i.e. accounts and creates the table in the BQ dataset. BQ sink connector also created a account_tmp* table. All the records read from kafka topic is first pushed to account_temp* table and then merged with parent account table.

Accounts table created automatically in BQ
Accounts and Accounts_tmp* table created in Debezium dataset in BQ

Data validation

Initial snapshot validation
Changed data replication validation

Case 1: Insert a record

Case 2 : Delete 2 records

Case 3: Update a record

Case 4: Alter table to add a column and insert a record with new structure

Case 5: Alter table to drop a column and insert a record with new structure

Initial snapshot validation

Source : CloudSQL Postgres table — Accounts [58 records]:

Kafka Topic — Accounts [58 records]

Bigquery table — Accounts [58 records]

Changed data replication validation

Case 1: Insert a record

Insert a record in source cloudsql pg accounts table . [59 records]

Kafka Topic — Accounts [59 Records] . Last rows shows the same values as inserted.

BigQuery table — Accounts [59 records].Last rows shows the same values as inserted.

Case 2 : Update a record

Delete 2 records from source cloudsql pg accounts table . [57 records]

Kafka topic [61 records with the latest entries as delete operations]

BigQuery table — Accounts [57 records].Deletes are treated as soft deletes in BQ so use op <> ‘d’ in the where clause of the SQL statement while fetching the total count.

Case 3: Update a record

Update a record in source cloudsql Postgres[57 records]

Kafka topic [62 records with the latest entries as Update operation]

BigQuery table [62 records with the updated data]

Case 4: Alter table to add a column and insert a record with new structure

Alter the table structure by adding a column. Add a column city in cloudsql PG account table and insert a record.

Kafka topic [New record with New column]

Debezium logs indicating the auto schema change

Bigquery table [New column with latest record]:

Case 5: Alter table to drop a column and insert a record with new structure

Alter the table structure by dropping a column. Drop column City in cloudsql pg table and insert a record.

Debezium logs indicating the auto schema change

kafka topic Kafka topic [New record with dropped column]:

Bigquery table [City Column is dropped and latest record added with New structure]:

We have successfully installed installed and configured the source and sink connectors and have also replicated the data with seamless schema evolution.

Useful Commands

Connector Registration Validation:

curl -k -H “Content-Type: application/json” http://localhost:8083/connectors/

[“sourcecloudsqlpg”,”targetbigquery"]

List Kafka Topics

usr/bin/kafka-topics — list — zookeeper 10.128.0.14:2181

Read a kafka topic

/usr/bin/kafka-console-consumer — topic accounts — from-beginning — bootstrap-server 10.128.0.14:9092

Delete a connector from Debezium

curl -X DELETE http://localhost:8083/connectors/<connectorname>

We were able to successfully setup a data replication pipeline from CloudSQL PG to BigQuery. This data replication pipeline also seamlessly pushed the schema changes from the source database[CloudSQL PG] to the target database[BQ].

I see you when I see you , Happy Learning !

Saurabh !

--

--