Configuring Oracle CDC with WSO2 Streaming Integrator

Lasantha Samarakoon
7 min readMar 4, 2020

--

This is a series of articles regarding Change Data Capturing with WSO2 Streaming Integrator. This series will take you through topics from introduction to WSO2 Streaming Integrator and CDC to configuring major databases for CDC with WSO2 Streaming Integrator.

This article will walk you through steps to configure CDC for Oracle 12cR2 with WSO2 Streaming Integrator.

If you have visited this article directly and still looking for an overview of change data capturing and how WSO2 Streaming Integrator can be a part of your solution, visit the first part of this article series, i.e. Change Data Capturing with WSO2 Streaming Integrator.

The guide provided below assumes that you have a good understanding of the CDC and its applications. Apart from that, the following prerequisites are expected.

Prerequisites

  • A working Oracle 12cR2 (12.2.0.1) database environment. (If you are looking for a easier deployment, you can use the Oracle’s official docker images).
  • A working WSO2 Streaming Integrator instance (For this you can either use the worker or the tooling runtimes). You can download the WSO2 Streaming Integrator from here. If you need more details on the WSO2 Streaming Integrator, please refer the official documentation.

In addition to the above, the following database configurations are assumed in the steps below.

Host      : localhost
Port : 1571
SID : ORCLCDB
Schema : inventory
Table name: products
Columns :
ID INTEGER
NAME VARCHAR2(20)
DESCRIPTION VARCHAR2(50)
QUANTITY INTEGER
Oracle CDC with WSO2 Streaming Integrator

Create a schema to be monitored

1. Log on to the Oracle database as the SYSDBA and execute the following command to create the required user.

CREATE USER inventory IDENTIFIED BY dbz;GRANT CONNECT TO inventory;GRANT CREATE SESSION TO inventory;GRANT CREATE TABLE TO inventory;GRANT CREATE SEQUENCE TO inventory;ALTER USER inventory QUOTA 100M ON users;

2. Now log on to the Oracle database as inventory and execute the following command to create the products table.

CREATE TABLE products (
ID INTEGER GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) NOT NULL PRIMARY KEY,
NAME VARCHAR2(20) NOT NULL,
DESCRIPTION VARCHAR2(50),
QUANTITY INTEGER
);

Enable CDC on database/tables

1. Log on to the Oracle database as the SYSDBA and execute the following command to enable archive log mode and GG replication.

ALTER SYSTEM SET db_recovery_file_dest_size = 5G;ALTER SYSTEM SET db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope = spfile;ALTER SYSTEM SET enable_goldengate_replication = true;SHUTDOWN IMMEDIATE;STARTUP MOUNT;ALTER DATABASE ARCHIVELOG;ALTER DATABASE OPEN;ARCHIVE LOG LIST;

2. Create an XStream admin user by log on to the database as the SYSDBA.

CREATE TABLESPACE xstream_adm_tbs DATAFILE     '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf' 
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrmadmin IDENTIFIED BY xsa
DEFAULT TABLESPACE xstream_adm_tbs
QUOTA UNLIMITED ON xstream_adm_tbs;
GRANT CREATE SESSION TO xstrmadmin;BEGIN
DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'xstrmadmin',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE);
END;

3. Create the XStream user which is needed to connect to the database. Create this user by login the database as SYSDBA.

CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrm IDENTIFIED BY xs
DEFAULT TABLESPACE xstream_tbs
QUOTA UNLIMITED ON xstream_tbs;
GRANT CREATE SESSION TO xstrm;
GRANT SELECT ON V_$DATABASE to xstrm;
GRANT FLASHBACK ANY TABLE TO xstrm;

4. Once the XStream user is created, create an XStream outbound server. This outbound server is used to receive CDC events from the database. Only one Debezium connector (WSO2 Streaming Integrator internally uses Debezium connector for CDC. If you need more information regarding the internal implementation, please refer the part 1 of this article series.) can connect to an XStream outbound server, therefore it is necessary to create outbound servers per each CUD operation (insert, update, and delete) per table. To create an Xstream outbound server, run the following command after login to the database server as XStream admin user.

DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
tables(1) := NULL;
schemas(1) := 'inventory';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'dbzxout',
table_names => tables,
schema_names => schemas);
END;

5. Once the outbound server is created, it is necessary to grant permission for the XStream user to connect to the outbound server. The following command shows how to configure the outbound server to allow access for XStream users. Run the following command as the SYSDBA.

BEGIN
DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
server_name => 'dbzxout',
connect_user => 'xstrm');
END;

Prepare schema to publish CDC

To enable publishing CDC events to a particular table in a schema, execute the following commands by login to the database as the DB owner i.e. inventory.

GRANT SELECT ON products TO xstrm;
ALTER TABLE products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Note:

You can list the created outbound servers by invoking the following command.

SELECT * FROM DBA_CAPTURE;

You can start an outbound server by invoking the following command.

CALL DBMS_CAPTURE_ADM.START_CAPTURE('CAP$_DBZXOUT_1');

And stop it using the following command.

CALL DBMS_CAPTURE_ADM.STOP_CAPTURE('CAP$_DBZXOUT_1');

In the above command, CAP$_DBZXOUT_1 is the name of the outbound service which will result in the above SELECT query.

Configure Streaming Integrator

Once the Oracle database is configured for change data capture, this section explains the steps to configure WSO2 Streaming Integrator with the Oracle database.

To configure the Streaming Integrator to receive events from the Oracle database capture process follow the steps mentioned below.

1. In Linux, download the relevant instant client ZIP archive from https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html. For Oracle 12cR2, the respective instant client will be instantclient-basic-linux.x64–12.2.0.1.0.zip.

In macOS, instead of the above download in the instant client ZIP archive from https://www.oracle.com/database/technologies/instant-client/macos-intel-x86-downloads.html.

2. Extract the instant client zip archive to a preferred location. This location will be referred to as <INSTANT_CLIENT_DIR> hereafter.

3. In Linux, go to the <INSTANT_CLIENT_DIR> and create a symbolic link for ‘libclntsh.so.12.1’ file as ‘libclntsh.so’. You can use the following command to create the symbolic link. In macOS, you can ignore this step.

ln -s libclntsh.so.12.1 libclntsh.so

4. In Linux, export the path to <INSTANT_CLIENT_DIR> as LD_LIBRARY_PATH as follows.

export LD_LIBRARY_PATH=<INSTANT_CLIENT_DIR>

In macOS, open the <SI_HOME>/wso2/server/bin/carbon.sh and append the following line.

-Djava.util.logging.config.file="$RUNTIME_HOME/bin/bootstrap/logging.properties" \
-Djava.library.path="<INSTANT_CLIENT_DIR>" \
-Djava.security.egd=file:/dev/./urandom \

5. Go to <SI_HOME>/bin/tools directory.

6. Remove the org.wso2.carbon.tools.core-5.2.8.jar file.

7. Download JAR from https://mvnrepository.com/artifact/org.wso2.carbon/org.wso2.carbon.tools.core/5.2.12 and copy that to the <SI_HOME>/bin/tools directory.

8. Download respective spi-provider.bat (for Windows) or spi-provider.sh (for Unix) file from https://github.com/wso2/carbon-kernel/tree/v5.2.12/features/org.wso2.carbon.server.feature/resources/bin and copy that to <SI_HOME>/bin directory.

9. In the <SI_HOME>/bin directory and convert <INSTANT_CLIENT_DIR>/ojdbc8.jar file into a bundle using the spi-provider tool as follows.

./spi-provider.sh oracle.jdbc.OracleDriver java.sql.Driver <INSTANT_CLIENT_DIR>/ojdbc8.jar .

10. The above command converts the Oracle JDBC driver and place it in <SI_HOME>/bin/ojdbc8 directory. Move the driver JAR(ojdbc8_1.0.0.jar) to <SI_HOME>/lib directory.

11. Download respective jni-provider.bat (for Windows ) or jni-provider.sh (for Unix) file from https://github.com/wso2/carbon-kernel/tree/v5.2.12/features/org.wso2.carbon.server.feature/resources/bin and copy that to <SI_HOME>/bin directory.

12. In the <SI_HOME>/bin directory and convert <INSTANT_CLIENT_DIR>/xstreams.jar file to a bundle using the jni-provider tool as follows.

./jni-provider.sh <INSTANT_CLIENT_DIR>/xstreams.jar . clntsh

13. Move the converted XStreams client (xstreams_1.0.0.jar) to <SI_HOME>/lib directory.

Write the CDC Siddhi application

This section explains the steps to create a Siddhi app to receive events from the CDC and log the message in the console/log.

  1. Open a preferred text editor and create a new file.
  2. Add the following content into the file and save it as <SI_HOME>/wso2/server/deployment/siddhi-files/<SIDDHI_APP_NAME>.siddhi.
@App:name('ProductInsertCDCTestApp')
@App:description('Capture Oracle database inserts to inventory.products table via CDC on listening mode.')
@source(type='cdc',
url='jdbc:oracle:thin://localhost:1521/ORCLCDB',
username='xstrm',
password='xs',
table.name='inventory.products',
operation='insert',
database.server.name='PRODINS',
connector.properties="database.out.server.name=dbzxout,snapshot.mode=initial_schema_only",
@map(type = 'keyvalue', fail.on.missing.attribute='false'))
define stream ProductInsertInputStream (
ID int, NAME string, DESCRIPTION string, QUANTITY int);
@sink(type='log')
define stream ProductInsertOutputStream (
ID int, NAME string, DESCRIPTION string, QUANTITY int);
@info(name = 'show_in_logs_query')
from ProductInsertInputStream
select *
insert into ProductInsertOutputStream;

In the above Siddhi app, ProductInsertInputStream stream receives events when there is a record inserted into the ProductCatalogue.Products table. The operation which the current CDC listener is listening on is configured through the operation parameter.

Since this table contains 4 columns, the values of each column maps to the respective attribute in the above stream. A list of event attributes differ based on the configured operation as follows.

  • Insert: The incoming event will contain attributes with each column name.
  • Update: The incoming event will contain attributes with each column name along with attributes with each column name prefixed with ‘before_’.
  • Delete: The incoming event will contain attributes with each column name prefixed with ‘before_’.

When configuring Oracle CDC, it requires few additional properties which are passed through the connector.properties parameter. Since Oracle CDC requires to configure an XStream outbound server, the name of the outbound server should be set as the property database.out.server.name.

In the above example, once an event is received from the ProductInsertInputStream, it will be sent to ProductInsertOutputStream which will finally log the event payload in the console/log.

Enroll a new table for CDC

The above configuration configures the particular database/table and the WSO2 Streaming Integrator for CDC. Once the above is done, if you are planning to add another table for CDC, follow the steps mentioned below.

  1. Create a new Xstream outbound server as mentioned in steps 4 and 5 of the “Enable CDC on database/tables” section.
  2. Follow the steps mentioned under “Prepare schema to publish CDC” to enable publishing CDC for the particular table.

What’s next?

The next part of this article will guide you through the steps to configure CDC with Microsoft SQL Server.

References

  1. WSO2 Streaming Integrator home page
    [https://wso2.com/integration/streaming-integrator/]
  2. WSO2 Streaming Integrator documentation
    [https://ei.docs.wso2.com/en/latest/streaming-integrator/overview/overview/]
  3. Debezium Connector for Oracle
    [https://debezium.io/documentation/reference/0.10/connectors/oracle.html]
  4. Debezium Vagrant box for Oracle
    [https://github.com/debezium/oracle-vagrant-box]
  5. Debezium Examples
    [https://github.com/debezium/debezium-examples]
  6. Siddhi-IO-CDC connector
    [https://github.com/siddhi-io/siddhi-io-cdc]
  7. Siddhi-IO-CDC documentation
    [https://siddhi-io.github.io/siddhi-io-cdc]

--

--