Configuring Microsoft SQL Server CDC with WSO2 Streaming Integrator

Lasantha Samarakoon
4 min readJun 11, 2020

--

This is a series of articles regarding Change Data Capturing with WSO2 Streaming Integrator. This series will take you through topics from introduction to WSO2 Streaming Integrator and CDC to configuring major databases for CDC with WSO2 Streaming Integrator.

This article will walk you through steps to configure CDC for Microsoft SQL Server with WSO2 Streaming Integrator.

If you have visited this article directly and still looking for an overview of change data capturing and how WSO2 Streaming Integrator can be a part of your solution, visit the first part of this article series, i.e. Change Data Capturing with WSO2 Streaming Integrator.

The guide provided below assumes that you have a good understanding of the CDC and its applications. Apart from that, the following prerequisites are expected.

Prerequisites

  • A working Microsoft SQL Server database environment. (If you are looking for easier deployment, you can use the Microsoft’s official docker images).
  • A working WSO2 Streaming Integrator instance (For this you can either use the worker or the tooling runtimes). You can download the WSO2 Streaming Integrator from here. If you need more details on the WSO2 Streaming Integrator, please refer the official documentation.

In addition to the above, the following database configurations are assumed in the steps below.

Host      : localhost
Port : 1433
Database : sales
Table : products
Columns :
id int (pk)
name varchar(100)
category_id int
quantity int

Create database/tables

  1. Log on to the Microsoft SQL Server database and run the following SQL query. This will create the sales database and the products table.
-- Create the sales database and switch it
create database sales
go
use sales
go
-- Create the product table
create table products (
id int,
name varchar(100),
quantity int,
primary key (id)
)
go

Enable CDC on the database/tables

  1. Log on to the database and run the following command to enable CDC on the database level.
exec sys.sp_cdc_enable_db
go

If you need to disable CDC, you can run the following SQL command.

exec sys.sp_cdc_disable_db
go

2. Run the following command to enable CDC on at the table level.

exec sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'products',
@role_name = null,
@supports_net_changes = 0
go

If you need to disable CDC at the table level, you can run the following command.

exec sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name = N'products',
@capture_instance = N'dbo_products'
go

Note:

If you need to see the status of the CDC job scheduled on the Microsoft SQL Server, you can run the following command.

exec sys.sp_cdc_help_change_data_capture
go

For more information on, configuring CDC in the Microsoft SQL Server database, refer the Microsoft documentation on “Enable and Disable Change Data Capture (SQL Server)”.

Configure Streaming Integrator

Once the Microsoft SQL Server database is configured for CDC, configurations in the WSO2 Streaming Integrator needs to be done.

To configure the WSO2 Streaming Integrator to receive events from the MSSQL change data capture process follow the steps mentioned below.

  1. From the <SI_HOME>/lib directory, remove the existing siddhi-io-cdc-*.jar file.
  2. Download siddhi-io-cdc-2.0.5.jar from here and copy the JAR file to <SI_HOME>/lib directory.
  3. Download the relevant JDBC driver for the used MSSQL version. In this deployment, sqljdbc42.jar is being used.
  4. Before deploying the JDBC driver in the Streaming Integrator, the downloaded JAR file needs to be converted into an OSGi bundle. Therefore in a command prompt, go to the <SI_HOME>/bin directory and execute the following command. This will create another JAR file at the destination directory (ex. sqljdbc42_1.0.0.jar).
./jartobundle.sh <PATH_TO_JAR> <DESTINATION_DIR>

Example:

./jartobundle.sh sqljdbc42.jar .

3. Copy the generated JAR file to <SI_HOME>/lib directory.

4. Restart the server.

Write the CDC Siddhi application

This section explains the steps to create a Siddhi app to receive events from the CDC and log the message in the console/log.

  1. Open a preferred text editor and create a new file.
  2. Add the following content into the file and save it as <SI_HOME>/wso2/server/deployment/siddhi-files/ProductInsertCDCTestApp.siddhi.
@App:name('ProductInsertCDCTestApp')
@App:description('Capture MSSQL database inserts to sales.Products table via CDC on listening mode.')
@source(type='cdc',
url='jdbc:sqlserver://localhost:1433;databaseName=sales;',
username='cdc_user',
password='cdc_passwd',
table.name='dbo.Products',
operation='insert',
database.server.name='localhost\default',
connector.properties='snapshot.mode=initial_schema_only',
@map(type='keyvalue', fail.on.missing.attribute='false'))
define stream ProductInsertInputStream (id int, name string, description string, quantity int);
@sink(type='log')
define stream ProductInsertOutputStream (id int, name string, description string, quantity int);
@info(name = 'show_in_logs_query')
from ProductInsertInputStream
select *
insert into ProductInsertOutputStream;

In the above Siddhi app, ProductInsertInputStream stream receives events when there is a record inserted into the sales.Products table. The operation which the current CDC listener is listening on is configured through the operation parameter.

Since this table contains 4 columns, the values of each column maps to the respective attribute in the above stream. A list of event attributes differ based on the configured operation as follows.

  • Insert: The incoming event will contain attributes with each column name.
  • Update: The incoming event will contain attributes with each column name along with attributes with each column name prefixed with ‘before_’.
  • Delete: The incoming event will contain attributes with each column name prefixed with ‘before_’.

In the above example, once an event is received from the ProductInsertInputStream, it will be sent to ProductInsertOutputStream which will finally log the event payload in the console/log.

Enroll a new table for CDC

Once the database and the Streaming Integrator are configured for CDC, in a situation where a new table needs to be enrolled with CDC follow the steps mentioned below.

  1. Enable change data capture for the table as mentioned in the “Enable CDC on the database/tables” section.
  2. Create a siddhi app for the particular table as mentioned under the “Write the CDC Siddhi application” section.

References

  1. WSO2 Streaming Integrator home page
    [https://wso2.com/integration/streaming-integrator/]
  2. WSO2 Streaming Integrator documentation
    [https://ei.docs.wso2.com/en/latest/streaming-integrator/overview/overview/]
  3. Debezium Connector for SQL Server
    [https://debezium.io/documentation/reference/0.10/connectors/sqlserver.html]
  4. Siddhi-IO-CDC connector
    [https://github.com/siddhi-io/siddhi-io-cdc]
  5. Siddhi-IO-CDC documentation
    [https://siddhi-io.github.io/siddhi-io-cdc]

--

--