Data streaming from MySQL Heatwave to Kafka

Soma Dey
Oracle Developers
Published in
4 min readJul 15, 2024
Photo by Joshua Sortino on Unsplash

Organizations often want to integrate the data from their databases to downstream systems for various use cases such as real time analytics, reporting, data auditing, and compliance.

If you are using MySQL Heatwave in Oracle Cloud and looking for an open-source CDC tool, this blog is for you.

We will use Debezium connector to configure CDC from MySQL Heatwave Service to Kafka.

Key components:

  1. Oracle Cloud Account
  2. MySQL Heatwave
  3. Zookeeper
  4. Kafka
  5. Kafka Connect
  6. Debezium MySQL Connector

Technical guide:

  1. Deploy MySQL Heatwave in oracle cloud

MySQL Heatwave is a cloud-native database service developed and managed by Oracle’s MySQL team, combining OLTP, analytics, and machine learning capabilities into a single solution. It is available in Oracle Cloud (OCI), AWS and Azure. It delivers integrated, high-performance data processing for various workloads.

Please refer to below link if guidance is required to provision it.

https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/mysql-database/index.html

2. Test the database connectivity using MySQL shell

Note: Database version deployed here — 8.0.34

[opc@soma-app mysql]$ mysqlsh admin@172.0.0.51
MySQL Shell 8.0.32

Copyright (c) 2016, 2023, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its affiliates.
Other names may be trademarks of their respective owners.

Type '\help' or '\?' for help; '\quit' to exit.
Creating a session to 'admin@172.0.0.51'
Fetching schema names for auto-completion... Press ^C to stop.
Your MySQL connection id is 5120
Server version: 8.0.34-u6-cloud MySQL Enterprise - Cloud
No default schema selected; type \use <schema> to set one.
MySQL 172.0.0.51:3306 ssl JS > \sql
Switching to SQL mode... Commands end with ;
MySQL 172.0.0.51:3306 ssl SQL > show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| mysql_audit |
| performance_schema |
| sys |

3. For simplicity, we have deployed Zookeeper, Kafka and Kafka connect in Oracle cloud VM using docker-compose-mysql.yaml file. In can be installed in dedicated VMs as well.

version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.6
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.6
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
image: quay.io/debezium/connect:2.6
ports:
- 8083:8083
links:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses

docker-compose -f docker-compose-mysql.yaml up

Check that all containers are up

[opc@soma-app mysql]$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5ac4a33b0616 quay.io/debezium/connect:2.6 "/docker-entrypoint.…" 16 seconds ago Up 15 seconds 8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp mysql_connect_1
b666f0a59cf6 quay.io/debezium/kafka:2.6 "/docker-entrypoint.…" 17 seconds ago Up 16 seconds 0.0.0.0:9092->9092/tcp mysql_kafka_1
434fdba06a05 quay.io/debezium/zookeeper:2.6 "/docker-entrypoint.…" 18 seconds ago Up 17 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp mysql_zookeeper_1

4. Prepare MySQL heatwave for CDC

  • Create a replication user
CREATE USER rpluser001@'%' IDENTIFIED BY '********' default role 'administrator';
  • Create a database and table to test the CDC
create database testcdc;
use testcdc;
CREATE TABLE employee (
ID int NOT NULL,
LastName varchar(255) NOT NULL,
FirstName varchar(255),
Age int,
PRIMARY KEY (ID)
);

insert into employee values(1,'Roy','Samanta',30);

MySQL 172.0.0.51:3306 ssl SQL > show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| mysql_audit |
| performance_schema |
| sys |
| testcdc |
+--------------------+
MySQL 172.0.0.51:3306 ssl testcdc SQL > select * from employee;
+----+----------+-----------+-----+
| ID | LastName | FirstName | Age |
+----+----------+-----------+-----+
| 1 | Roy | Samanta | 30 |
+----+----------+-----------+-----+

5. Create a configuration file (register-mysql.json) to deploy the debezium MySQL connector

{
"name": "employee-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "172.0.0.51",
"database.port": "3306",
"database.user": "rpluser001",
"database.password": "*******",
"database.ssl.mode": "preferred",
"database.server.id": "1234",
"topic.prefix": "dbserver2",
"database.include.list": "testcdc",
"table.include.list": "testcdc.employee",
"incremental.snapshot.allow.schema.changes": "true",
"snapshot.mode": "when_needed",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.testcdc",
"max.batch.size": "512",
"max.queue.size": "1024",
"include.query": "false"
}
}

6. Register the Debezeium MySQL connector to monitor the employee table in testcdc database.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
[opc@soma-app mysql]$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
HTTP/1.1 201 Created
Date: Thu, 27 Jun 2024 04:49:49 GMT
Location: http://localhost:8083/connectors/employee-connector
Content-Type: application/json
Content-Length: 719
Server: Jetty(9.4.53.v20231009)

{"name":"employee-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"172.0.0.51","database.port":"3306","database.user":"rpluser001","database.password":"********","database.ssl.mode":"preferred","database.server.id":"1234","topic.prefix":"dbserver2","database.include.list":"testcdc","table.include.list":"testcdc.employee","incremental.snapshot.allow.schema.changes":"true","snapshot.mode":"when_needed","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.testcdc","max.batch.size":"512","max.queue.size":"1024","include.query":"false","name":"employee-connector"},"tasks":[],"type":"source"}

7. Check if it has created the Kafka topic

[opc@soma-app mysql]$  docker exec mysql_kafka_1 /kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092
__consumer_offsets
dbserver2
dbserver2.testcdc.employee
my_connect_configs
my_connect_offsets
my_connect_statuses
schema-changes.testcdc

8. Test the Setup by consuming message

docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver2.testcdc.employee

[opc@soma-app mysql]$ docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
> --bootstrap-server kafka:9092 \
> --from-beginning \
> --property print.key=true \
> --topic dbserver2.testcdc.employee
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"dbserver2.testcdc.employee.Key"},"payload":{"ID":1}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"LastName"},{"type":"string","optional":true,"field":"FirstName"},{"type":"int32","optional":true,"field":"Age"}],"optional":true,"name":"dbserver2.testcdc.employee.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"LastName"},{"type":"string","optional":true,"field":"FirstName"},{"type":"int32","optional":true,"field":"Age"}],"optional":true,"name":"dbserver2.testcdc.employee.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver2.testcdc.employee.Envelope","version":2},"payload":{"before":null,"after":{"ID":1,"LastName":"Roy","FirstName":"Samanta","Age":30},"source":{"version":"2.6.2.Final","connector":"mysql","name":"dbserver2","ts_ms":1719463792000,"snapshot":"first","db":"testcdc","sequence":null,"ts_us":1719463792000000,"ts_ns":1719463792000000000,"table":"employee","server_id":0,"gtid":null,"file":"binary-log.000859","pos":197,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1719463792195,"ts_us":1719463792195304,"ts_ns":1719463792195304000,"transaction":null}}

Insert a new record in MySQL Heatwave. We can see the payload in connector’s log.

 MySQL  172.0.0.51:3306 ssl  testcdc  SQL > insert into employee values(4,'Sen','Samaresh',32);
Query OK, 1 row affected (0.0028 sec)

Connector logs:
*********************************************************************************************************
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"dbserver2.testcdc.employee.Key"},"payload":{"ID":4}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"LastName"},{"type":"string","optional":true,"field":"FirstName"},{"type":"int32","optional":true,"field":"Age"}],"optional":true,"name":"dbserver2.testcdc.employee.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"LastName"},{"type":"string","optional":true,"field":"FirstName"},{"type":"int32","optional":true,"field":"Age"}],"optional":true,"name":"dbserver2.testcdc.employee.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver2.testcdc.employee.Envelope","version":2},"payload":{"before":null,"after":{"ID":4,"LastName":"Sen","FirstName":"Samaresh","Age":32},"source":{"version":"2.6.2.Final","connector":"mysql","name":"dbserver2","ts_ms":1719464628270,"snapshot":"false","db":"testcdc","sequence":null,"ts_us":1719464628270779,"ts_ns":1719464628270779000,"table":"employee","server_id":3005674236,"gtid":"6224f187-31ea-11ef-97ed-02001703e16a:7","file":"binary-log.000862","pos":497,"row":0,"thread":5145,"query":null},"op":"c","ts_ms":1719464628279,"ts_us":1719464628279211,"ts_ns":1719464628279211000,"transaction":null}}

9. Cleanup system

  • Delete MySQL Heatwave
  • Shutdown Zookeeper, kafka and connector
docker-compose -f docker-compose-mysql.yaml down

Conclusion

This setup helps to build a robust real-time data pipeline and ensure timely data propagation across various downstream systems. In a production environment zookeeper and Kafka can be deployed in OKE (oracle Kubernetes engine) or install on a dedicated OCI VM instance.

--

--

Soma Dey
Oracle Developers

/* Opinions expressed here are my own & do not express the views or opinions of my employer */