Sync MySQL to PostgreSQL Using Debezium and KafkaConnect

Sumant Rana
The Startup
Published in
7 min readOct 2, 2020

Some time back, I wrote an article on Configuring a Kafka Connect pipeline on Kubernetes and how to configure the connectors to move data from a MySQL database (using JDBCSourceConnector) to a text file (using FileSinkConnector).

In this article, I will take that setup a step forward, introduce a CDC tool, Debezium, and walk through the steps required to enable data transfer from MySQL to PostgreSQL using Debezium and KafkaConnect. The files are available in the repo https://github.com/sumantrana/kafka-connect-pipeline.

Why CDC?

The first question that crosses our minds is “Why do we need CDC?”. We already have a pipeline configured, then why bother introducing something new to the setup?

The answer lies in the subtle differences between how JDBCSourceConnector and Debezium MysqlConnector fetch their data and the advantages that the Debezium connector brings to the table. There are many advantages of using the Debezium connector over the JDBC Source connector and they have been documented in a plethora of articles that can be found but in short, the couple that affects us most are:

  1. Reading directly from the transaction logs: Debezium MySQL connector reads directly from the MySQL binary log file. The binary log file is a special file that MySQL uses to write all changes that happen to the database (DDL, DML) in a sequential way. Since all the transactions are documented, there is no chance that we can miss any transactions. Compare this to the JDBCSourceConnector which polls the database table(s) at specific intervals (depending on configuration) and picks up static snapshots at that instant. It might not be able to capture intermediate transactions if something got inserted and deleted within that interval and depending on the use case this may not be acceptable.
  2. Reading and storing schema changes: Whenever the schema of the source table changes, those changes are also written to the binary log file. The Debezium MySQL connector captures these schema changes, stores them in a separate Kafka topic, and uses them to reconstruct the table from scratch in cases where it is restarted or recovering from a crash.

In this article, we will configure the Debezium MySQL connector to read from the bin-log file and configure the separate Kafka topic where it can store the schema changes.

Prerequisites

  • Kubernetes cluster (GKE or equivalent).

For this article, I used a 5 node GKE cluster consisting of “n1-standard-1” nodes.

To be able to install Kafka Connect, MySQL, and PostgreSQL using helm charts.

  • MySQL client
  • PostgreSQL client

Install Kafka Connect

Please follow the instructions in Configuring a Kafka Connect pipeline on Kubernetes to install Kafka Connect on the Kubernetes cluster.

Install and configure MySQL

The default MySQL setup requires some changes so that it is able to generate the binary log file as well as Debezium MySQL connector is able to connect to it.

Add helm repository (google repo for MySQL)

helm repo add stable https://kubernetes-charts.storage.googleapis.com/

Customize default MySQL configuration

Download the default values.yaml file from here and update the custom MySQL configuration file section

# Custom mysql configuration files used to override default mysql settings
configurationFiles:
connect.cnf: |-
[mysqld]
skip-host-cache
skip-name-resolve
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_row_image = full
binlog_format = row

This configuration creates an additional .cnf file during installation and enables MySQL to generate the binary log file (named mysql-bin). It also assigns a server-id to the MySQL server which comes in handy when configuring the Debezium MySQL connector.

Install MySQL chart

helm install <mysql-release-name> -f values.yaml stable/mysql

<mysql-release-name> can be any string to uniquely identify this installation. For this installation, the <mysql-release-name> is mysql.

Connect to MySQL

  • Create a port forward rule for MySQL service listening on port 3306
kubectl port-forward svc/<mysql-release-name> 3306 &
  • Export required environment variables
MYSQL_HOST=127.0.0.1
MYSQL_PORT=3306
MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace default mysql -o jsonpath="{.data.mysql-root-password}" | base64 --decode; echo)
  • Connect to MySQL
mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}
  • Using the MySQL command prompt, create a database and a table
create database test;
use test;create table`students` (
`name` varchar(50) DEFAULT NULL,
`id` int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`)
);
  • Insert some default data into the table
insert into students (name) values ('Aaren');
insert into students (name) values ('Aarika');
insert into students (name) values ('Abagael');

Install and configure PostgreSQL

We can install the PostgreSQL directly from the helm chart as no configuration changes are required for the setup. The default setup will suffice.

Add helm repository (bitnami repo for PostgreSQL)

helm repo add bitnami https://charts.bitnami.com/bitnami

Install PostgreSQL chart

helm install <postgresql-installation-name> --set postgresqlPassword=<password>,postgresqlDatabase=test bitnami/postgresql

Instance password and database can be passed as a command-line argument to override the defaults. <postgresql-installation-name> can be any string to uniquely identify this installation. For this installation, the <postgresql-installation-name> is postgres.

Connect to PostgreSQL

  • Create a port forward rule for Postgres service listening on port 5432
kubectl port-forward --namespace default svc/<postgresql-installation-name>-postgresql 5432:5432 &
  • Export required environment variables
export POSTGRES_PASSWORD=$(kubectl get secret --namespace default <postgresql-installation-name>-postgresql -o jsonpath="{.data.postgresql-password}" | base64 --decode)
  • Connect to PostgreSQL
PGPASSWORD="$POSTGRES_PASSWORD" psql --host 127.0.0.1 -U postgres -d test -p 5432

Configure connect-standalone

  • Connect to the Kafka connect server
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Install vi or nano editor and unzip (it is not present by default).
apt-get update
apt install nano
apt install unzip
  • Download and install mysql driver
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.zipunzip mysql-connector-java-5.1.48.zipcp mysql-connector-java-5.1.48/mysql-connector-java-5.1.48-bin.jar /usr/share/java/kafka-connect-jdbc/
  • Download and install Debezium MySQL Connector
confluent-hub install debezium/debezium-connector-mysql:1.2.2

By default, the connector is installed in /usr/share/confluent-hub-components. The destination is configurable. However, if the destination is changed, we need to update the plugin.path property in /etc/schema-registry/connect-avro-standalone.properties file and append the new path to it.

  • Edit the standalone properties file
vi /etc/schema-registry/connect-avro-standalone.properties
  • Make change according to the environment
bootstrap.servers=<connect-release-name>-cp-kafka:9092
key.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081
value.converter.schema.registry.url=http://<connect-release-name>-cp-schema-registry:8081
rest.host.name=localhost
rest.port=9083
(It complains that the original port 8083 is already in use, as the default process is still running)
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

Update the plugin path and append new path if Debezium Connector was not installed in the default location.

Configure Debezium MySQL Source Connector

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Create Debezium MySQL Source Connector properties file:
vi debezium-mysql-source-connector.properties
  • Add the following properties
name=debezium-mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=mysql
database.port=3306
database.user=root
database.password=<mysql-root-password>
database.server.id=223344
database.server.name=dbserver1
database.history.kafka.topic=students-schema
database.whitelist=test
table.whitelist=test.students
message.key.columns=test.students:id
database.history.kafka.bootstrap.servers=kafkaconnect-cp-kafka:9092
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.data]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3

connector.class: This is the main class of MySQL connector provided by Debezium.

database.server.id: This is the id given to the MySQL server during installation. Please refer to the section “Customize default MySQL configuration”.

database.server.name: This is any String that can be used to uniquely identify the MySQL instance. This is especially useful when we are using a MySQL cluster.

database.history.kafka.topic: This is the topic Debezium connector will use to store the schema changes.

database.whitelist: If there are multiple databases in the instance, which one(s) need to be monitored and synched.

table.whitelist: Which tables in the whitelist database list need to be monitored and synched.

transforms*: This set of properties are used to configure a Route SMT (Single Message Transformation). By default when the Debezium connector receives an update, it tries to send them to a topic named <database.server.name>.<database.name>.<table.name>. So in our example, it would be “dbserver1.test.students”. In this example, we only want to store the information in the “students” topic therefore, we are stipping off the first two parts.

Configure the PostgreSQL Sink Connector

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Create PostgreSQL Sink Connector properties file:
vi debezium-postgres-sink-connector.properties
  • Add the following properties
name=test-jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:postgresql://postgres-postgresql:5432/test?user=postgres&password=<postgresql-password>
topics=students
dialect.name=PostgreSqlDatabaseDialect
auto.create=true
insert.mode=upsert
pk.fields=id
pk.mode=record_value
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope

connector.class: Generic JdbcSinkConnector is used for PostgreSQL

topics: Topics to subscribe for updates. In our case, we are modifying the default topic name generated by Debezium and using just the table name.

auto.create: Auto-create the schema objects if they do not exist e.g. the table will be auto-created during the initial sync.

insert.mode: Update instead of inserting the data with the same id again.

transforms*: This is another SMT provided by Debezium that we are going to use. By default, the structure of debezium is complex and consists of multiple levels of information including event key schema, event key payload, event value schema, event value payload (For details refer to Connector Documentation). Even in the event value payload section, we have multiple structures for values before and after. Of these, we are only interested in the final payload and that is what this SMT provides us with. It unwraps the original message and provides a relevant section. Note that we have applied this SMT after the data is saved in Kafka and before it is inserted in PostgreSQL so that Kafka remains a source of truth and has all the information (if and when required).

Run and Verify the data

  • Connect to the Kafka connect server (if not already connected)
kubectl exec -c cp-kafka-connect-server -it <kafka connect pod> -- /bin/bash
  • Execute the standalone connector to fetch data from MySQL and update it in the PostgreSQL tables.
connect-standalone /etc/schema-registry/connect-avro-standalone.properties debezium-mysql-source-connector.properties debezium-postgres-sink-connector.properties

If the process complains about the REST port “Address already in use”, update the “/etc/schema-registry/connect-avro-standalone.properties” file, change rest.port to 9084 and rerun.

  • Check the PostgreSQL table so verify that the data has been inserted correctly:
select * from students;

The output should show the records present in the database table and they should exactly match the ones in the MySQL students table.

Verify Live Updates

Add new data

  • Open a terminal and connect to MySQL
mysql -h ${MYSQL_HOST} -P${MYSQL_PORT} -u root -p${MYSQL_ROOT_PASSWORD}
  • Insert a new row in the MySQL database table.
insert into students (name) values ('LiveTest');

Verify results

  • Check the PostgreSQL table so verify that the data has been inserted correctly.
  • The newly inserted data should be visible.

--

--