Unlocking Real-time Data Synchronisation: Oracle to Kafka using Change Data Capture (CDC)

Sami Alashabi
Plumbers Of Data Science
7 min readAug 19, 2023
Kafka Connect in Action

If you enjoyed this story and want more valuable insights in the future, consider hitting the follow button!

In today’s data-driven landscape, organisations thrive on real-time insights to drive critical business decisions. Oracle Database stands as a pillar of enterprise-grade data storage, yet the need often arises to unlock its potential beyond its original confines. Enter Change Data Capture (CDC), an ingenious solution that allows organizations to efficiently capture and disseminate data changes from Oracle Database to various destinations, powering analytics and real-time applications. This article delves into the world of Oracle CDC, highlighting its significance and unveiling Confluent’s Oracle CDC Source Connector as a game-changer in the integration landscape.

This article is part 2 of the real-time data synchronisation articles, if you are curious about cdc using Postgres, you can follow the below link:

The Need for Oracle CDC

While Oracle Database excels in managing enterprise transaction workloads, businesses often yearn to leverage the data stored within it for broader applications, such as analytics and real-time processing. This is where Change Data Capture (CDC) comes to the rescue. CDC efficiently identifies and captures new, updated, or deleted data from Oracle Database tables, making this change data available for broader use within the organisation.

Challenges Faced with Other Solutions

The demand for a robust CDC solution for Oracle Database has grown steadily. Interviews with enterprise customers from various industries revealed that existing solutions often pose challenges:

1. Redundancy: Other solutions introduce redundant components when downstream applications lack out-of-the-box integrations, leading to complexity.
2. Licensing Costs: Some solutions incur prohibitive licensing costs for capturing high-value Oracle DB change events.
3. Technical Debt: In-house solutions built years ago often accumulate costly technical debt for maintenance and updates.

Introducing Confluent’s Oracle CDC Source Connector

In response to these challenges, Confluent introduces the Oracle CDC Source Connector, available for general use. This connector empowers organizations to efficiently and cost-effectively synchronize real-time data from Oracle Database to Confluent. When coupled with Confluent’s ksqlDB or sink connectors for modern data systems, the Oracle CDC Connector unlocks key use cases such as data synchronization, real-time analytics, and data warehouse modernization.

Benefits Unveiled by the Oracle CDC Connector

Customers who’ve experienced Confluent’s Oracle CDC Connector attest to its transformative benefits:

- Cost Savings: The connector potentially saves millions of dollars compared to existing solutions.
- Licensing Flexibility: It offers a licensing model that eliminates the need for burdensome additional licenses.
- Simplified Pipelines: The Oracle CDC Source Connector streamlines data pipelines, reducing redundancy.

Oracle Version Compatibility

Confluent’s Oracle CDC Source Connector (v1.0.0) utilizes Oracle LogMiner to read redo logs from the database. It supports Oracle Database versions 11g, 12c, 18c, and 19c. The connector can initiate synchronization either from a snapshot of the tables or from a specific Oracle system change number (SCN) or timestamp.

Demonstration: Oracle CDC Connector in Action

A practical demo illustrates how to establish a Change Data Capture pipeline between Oracle Database using Confluent’s Oracle CDC Source Connector.

  • Clone the repo:
git clone https://github.com/sami12rom/kafka-connect-oracle-cdc
cd kafka-connect-oracle-cdc
  • Run docker-compose up --buildin the terminal → initialisation might take few minutes.

Note: a docker image for Oracle in Apple M1 was created since its not compatible out of the box.

  • Connect to your oracle instance by executing: docker exec -it oracle bash
  • Login to your psql server: sqlplus '/ as sysdba'
  • Check if the ORACLE database is in archive log mode
SQL> select log_mode from v$database;

LOG_MODE
------------
ARCHIVELOG
  • Optional: if the log_mode is not archivelog, use the below:
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

select log_mode from v$database;
  • Grant further access and permissions:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

CREATE ROLE C##CDC_PRIVS;
GRANT EXECUTE_CATALOG_ROLE TO C##CDC_PRIVS;
GRANT ALTER SESSION TO C##CDC_PRIVS;
GRANT SET CONTAINER TO C##CDC_PRIVS;
GRANT SELECT ANY TRANSACTION TO C##CDC_PRIVS;
GRANT SELECT ANY DICTIONARY TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##CDC_PRIVS;
GRANT CREATE SESSION TO C##CDC_PRIVS;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##CDC_PRIVS;
GRANT LOGMINING TO C##CDC_PRIVS;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##CDC_PRIVS;
GRANT SELECT ON V_$DATABASE TO C##CDC_PRIVS;
GRANT SELECT ON V_$THREAD TO C##CDC_PRIVS;
GRANT SELECT ON V_$PARAMETER TO C##CDC_PRIVS;
GRANT SELECT ON V_$NLS_PARAMETERS TO C##CDC_PRIVS;
GRANT SELECT ON V_$TIMEZONE_NAMES TO C##CDC_PRIVS;
GRANT SELECT ON ALL_INDEXES TO C##CDC_PRIVS;
GRANT SELECT ON ALL_OBJECTS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_USERS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_CATALOG TO C##CDC_PRIVS;
GRANT SELECT ON ALL_CONSTRAINTS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_CONS_COLUMNS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_TAB_COLS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_IND_COLUMNS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_LOG_GROUPS TO C##CDC_PRIVS;
GRANT SELECT ON ALL_TAB_PARTITIONS TO C##CDC_PRIVS;
GRANT SELECT ON SYS.DBA_REGISTRY TO C##CDC_PRIVS;
GRANT SELECT ON SYS.OBJ$ TO C##CDC_PRIVS;
GRANT SELECT ON DBA_TABLESPACES TO C##CDC_PRIVS;
GRANT SELECT ON DBA_OBJECTS TO C##CDC_PRIVS;
GRANT SELECT ON SYS.ENC$ TO C##CDC_PRIVS;
GRANT SELECT ON V_$ARCHIVED_LOG TO C##CDC_PRIVS;
GRANT SELECT ON V_$LOG TO C##CDC_PRIVS;
GRANT SELECT ON V_$LOGFILE TO C##CDC_PRIVS;
GRANT SELECT ON V_$INSTANCE to C##CDC_PRIVS;
GRANT SELECT ANY TABLE TO C##CDC_PRIVS;
  • Create user & grant it access:
CREATE USER C##MYUSER IDENTIFIED BY password DEFAULT TABLESPACE USERS CONTAINER=ALL;
ALTER USER C##MYUSER QUOTA UNLIMITED ON USERS;
GRANT C##CDC_PRIVS to C##MYUSER CONTAINER=ALL;
GRANT CONNECT TO C##MYUSER CONTAINER=ALL;
GRANT CREATE TABLE TO C##MYUSER CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C##MYUSER CONTAINER=ALL;
GRANT CREATE TRIGGER TO C##MYUSER CONTAINER=ALL;
GRANT SELECT ON GV_$ARCHIVED_LOG TO C##MYUSER CONTAINER=ALL;
GRANT SELECT ON GV_$DATABASE TO C##MYUSER CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO C##MYUSER;
GRANT FLASHBACK ANY TABLE TO C##MYUSER container=all;
  • Create BPM_DEPARTMENTS and BPM_EMPLOYEES Tables:
-- CREATE TABLE 1
CREATE TABLE C##MYUSER.BPM_DEPARTMENTS
(
i INTEGER GENERATED BY DEFAULT AS IDENTITY,
DEPARTMENT VARCHAR2(100),
PRIMARY KEY (i)
);

GRANT SELECT ON C##MYUSER.BPM_DEPARTMENTS TO C##CDC_PRIVS;
insert into C##MYUSER.BPM_DEPARTMENTS (DEPARTMENT) values ('Accounting');
insert into C##MYUSER.BPM_DEPARTMENTS (DEPARTMENT) values ('Marketing');
insert into C##MYUSER.BPM_DEPARTMENTS (DEPARTMENT) values ('Selling');
insert into C##MYUSER.BPM_DEPARTMENTS (DEPARTMENT) values ('HR');
COMMIT;


-- CREATE TABLE 2
CREATE TABLE C##MYUSER.BPM_EMPLOYEES
(
ID NUMBER GENERATED BY DEFAULT AS IDENTITY,
name VARCHAR2(100),
CONSTRAINT PK_ID PRIMARY KEY (ID)
);
GRANT SELECT ON C##MYUSER.BPM_EMPLOYEES TO C##CDC_PRIVS;
insert into C##MYUSER.BPM_EMPLOYEES (name) values ('Jessica');
insert into C##MYUSER.BPM_EMPLOYEES (name) values ('John');
insert into C##MYUSER.BPM_EMPLOYEES (name) values ('Alex');
COMMIT;

Connector Configuration

  • Check the OracleCdcSourceConnector source plug-in is available
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connector-plugins | jq '.'
  • And look for "class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector"
  • Lets review the connector configuration oracle_cdc_connector.json before starting it.
{
"name": "oracle_cdc",
"config":{
"connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
"name": "oracle_cdc",
"tasks.max":1,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.template": "${primaryKeyStructOrValue}",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"confluent.topic.bootstrap.servers":"broker:29092",
"oracle.server": "oracle",
"oracle.port": 1521,
"oracle.sid":"ORCLCDB",
"oracle.username": "C##MYUSER",
"oracle.password": "password",
"start.from":"snapshot",
"table.inclusion.regex":"ORCLCDB\\.C##MYUSER\\.BPM_(.*)",
"table.exclusion.regex":"",
"table.topic.name.template": "${fullyQualifiedTableName}",
"connection.pool.max.size": 20,
"confluent.topic.replication.factor":1,
"redo.log.consumer.bootstrap.servers":"broker:29092",
"redo.log.corruption.topic": "redo-corruption-topic",
"topic.creation.groups": "redo",
"topic.creation.redo.include": "redo-log-topic",
"topic.creation.redo.replication.factor": 1,
"topic.creation.redo.partitions": 1,
"topic.creation.redo.cleanup.policy": "delete",
"topic.creation.redo.retention.ms": 1209600000,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "delete"
}
}
  • Its now time to start the connector based on the configuration set.
 curl -s -X POST -H 'Content-Type: application/json' --data @oracle_cdc_connector.json http://localhost:8083/connectors | jq
  • Check the status of the connector:
curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/oracle_cdc/status | jq

The Oracle CDC Source Connector continuously monitors the original database, creating an event stream with both snapshot and subsequent changes.

The Oracle CDC Source connector scales horizontally using the existing Kafka Connect framework

Topic

Checking confluent topics, the topics will contain the table data:

Insert, update and delete some data (Data Manipulation Language)

Lets try to update and delete some data

-- CREATE TABLE 1
INSERT into C##MYUSER.BPM_DEPARTMENTS (DEPARTMENT) values ('Management');
UPDATE C##MYUSER.BPM_DEPARTMENTS SET DEPARTMENT = 'Sales' WHERE name = 'Selling';
DELETE C##MYUSER.BPM_DEPARTMENTS where DEPARTMENT = 'Marketing';

-- CREATE TABLE 2
INSERT into C##MYUSER.BPM_EMPLOYEES (name) values ('Sami');
UPDATE C##MYUSER.BPM_EMPLOYEES SET name = 'Axol' WHERE name = 'Alex';
DELETE C##MYUSER.BPM_EMPLOYEES where name = 'John';

COMMIT;
  • Which shows the record has been updated confirmed by op_type=U.
{
"I": "\u0003",
"DEPARTMENT": {
"string": "Sales"
},
"table": {
"string": "ORCLCDB.C##MYUSER.BPM_DEPARTMENTS"
},
"scn": {
"string": "1429497"
},
"op_type": {
"string": "U"
},
"op_ts": {
"string": "1692482621000"
},
"current_ts": {
"string": "1692482626754"
},
"row_id": {
"string": "AAASLwAAHAAAAGsAAC"
},
"username": {
"string": "SYS"
}
}

Modify Table Columns (Data Definition Language)

ALTER TABLE C##MYUSER.BPM_EMPLOYEES ADD (SURNAME VARCHAR2(100));
insert into C##MYUSER.BPM_EMPLOYEES (name, surname) values ('Harry', 'Potter');
commit;

This will add a new column and at the same time create a new schema version by evolving the schema.

Destroy the containers

docker-compose down

Conclusion: Empowering Data Integration

As organizations strive for real-time insights, the enablement of Oracle CDC with Confluent’s Oracle CDC Source Connector offers an unprecedented avenue for data integration. By seamlessly connecting Oracle Database to the Kafka ecosystem, businesses can enable transformative use cases that power growth, analytics, and innovation which reduces complexities and opens the door to limitless possibilities.

--

--