Change Data Capturing with WSO2 Streaming Integrator

Lasantha Samarakoon
7 min readFeb 25, 2020

--

This is a series of blog posts regarding Change Data Capturing with WSO2 Streaming Integrator. This series will take you through topics from introduction to WSO2 Streaming Integrator and CDC to configuring major databases for CDC with WSO2 Streaming Integrator.

NOTE: This article is originally published in DZone. It is published again here to maintain the continuity of the series.

Photo by Mika Baumeister on Unsplash

NOTE: This article is originally published in DZone. It is published again here to maintain the continuity of the series.

Streaming integration is becoming one of the core components under the enterprise integration stack. Unlike traditional batch integration, streaming integration allows performing ETL operations upon data in real-time and provides results. This empowers businesses by allowing them to act upon fresh data and draw decisions as soon as the data is produced.

But, from where does this data produce? Most of the time, these streaming data is being produced from various data sources such as applications, sensors, etc. And in some of the cases, well-known data sources such as RDBMS can participate in producing such streaming data. This is where the CDC, a.k.a change data capture comes into the picture.

What is CDC?

Data sources are where the data is stored and managed usually. Upon a data source, there are common operations carry out. For example,

  • Creating, and dropping databases
  • Creating, altering, and dropping tables
  • Inserting, updating and deleting records.

Instead of just being the storage of data, what if the data source could emit notifications regarding the schema/data changes as well. This is what the CDC is. CDC manages a history of changes upon schema/data and allows notifying externals parties about a particular change in the database. Initially, CDC was introduced to support data replication systems to deliver real-time transactions to data warehouses, which inherently makes it a replication technology.

In most of the CDC implementations, they only read from a database/logs to identify the changes, but do not write back to the database. This makes CDC an efficient, low latency data transfer technique with a very low-performance impact.

The following can be identified as some of the popular use cases for CDC.

  • Replicating data from a source to destination
  • Data warehousing
  • Business intelligence
  • Notifying external applications regarding data changes in real-time

For a business organization, having real-time data processing rather than batch processing has many advantages. For examples,

  • No off-peak time for processing as global organizations are operating 24/7
  • Real-time/pseudo-real-time access to analyzed data
  • Ability to achieve a competitive advantage by having timely data

In today’s technology, there are several approaches to implement CDC. The following section describes the main approaches used to implement CDC.

  1. Polling table to identify inserted records.
  2. Use table triggers to manage a log which let’s identifying the changes (trigger-based)
  3. Read log files and identify the changes
  4. Use database native support for CDC

Polling table to identify inserted records

As the name implies, this is a polling approach where the CDC client reads the database to identify if there are any records inserted. The client remembers the last read record (mostly by its primary key) and then check if there are any records inserted after that. If so, it reads the newly inserted record and emits them.

This approach is not real-time as the client polls the database for changes in defined intervals.

Use table triggers to manage a log which let’s identifying the changes (trigger-based)

This is an enhanced approach to the previous method. As the title depicts, this approach uses table triggers to keep an internal log (as an additional table) regarding all the data changes. Here if there are any change happened (either insert, update, or delete) the respective trigger invokes and inserts a record into a log table regarding the change. Then the CDC client reads the log table time-to-time to identify the changes. If there are changes then the client emits those changes.

Similar to the above approach, this is also not a real-time approach as the DC client needs to poll the database for changes in defined intervals.

Read log files and identify the changes

In this approach the CDC client does not read the database tables for the changes, instead reads the transaction logs. Every transactional database uses transaction logs to ensure ACID properties in case of the database crashes or failures. That means the transaction logs contain the traces of all the actions carried out on the database. This allows a CDC client to read the transaction log and emit the changes performed on that particular database.

Use database native support for CDC

Most of the modern databases (such as MySQL, Oracle, and MSSQL) provide native support for CDC. The actual implementation is different among each database.

Unlike the above polling implementations, this is a real-time approach.

Using WSO2 Streaming Integrator for CDC

WSO2 Streaming Integrator is a streaming data processing runtime that allows integrating streaming data and take actions on it. It contains 60+ prebuilt, well-tested connectors that can be used to connect to sources and destinations such as Kafka, files, databases, and endpoints such as HTTP.

Streaming Integrator is powered by Siddhi, a proven cloud-native stream processing engine. It’s container friendly by design and can be deployed in VM, Docker, and Kubernetes environments. It is also fully compatible with stream messaging systems such as Kafka and NATS.

Streaming Integrator consists of sophisticated tooling support that provides rich and agile developer experience. Equipped with a graphical drag-and-drop editor which enables faster development for non-technical users as well.

For CDC, WSO2 Streaming Integrator utilizes a dedicated connector called “siddhi-io-cdc”. This connector provides capabilities to listen to a particular database and emit events for data changes. The connector supports databases such as MySQL, PostgreSQL, Microsoft SQL Server, H2 and Oracle.

For more information on WSO2 Streaming Integrator, visit WSO2 Streaming Integration Documentation.

The architecture of the Streaming Integrator CDC

Streaming Integrators CDC connector mainly uses two different implementations to detect data changes.

  • Polling mode
  • Listening mode

Polling mode

The polling mode is the implementation of the above “Polling table to identify inserted records” approach. As described above this method can be used to detected inserts only. The implementation uses the max value in a reference column (called as polling column) to identify the recently inserted records. It polls the table time-to-time and gets the records that have values greater than the previously remembered maximum value and emits events for each of these records.

Implementation of polling mode in siddhi-io-cdc.

The main disadvantages of this approach are,

  • Not real-time
  • Can detect only inserts

The polling mode supports MySQL, Postgres, MSSQL, Oracle, H2 databases.

Listening mode

The listening mode uses native CDC implementation of each database to detect changes. As mentioned above the approach that each database uses for native CDC support is different in each database. To handle that, the connector internally uses Debezium CDC connector as an abstract layer. The Debezium connector delivers the changes from the database to the Siddhi CDC connector. Once a change is detected due to any of the above 3 operations, the Siddhi CDC connector emits an event with all the required payload. The payload can be received via a Siddhi stream.

Implementation of listening mode in siddhi-io-cdc

Based on the database operation, the number and types of attributes are different as follows.

  • Insert: The incoming event will contain attributes with each column name. These attributes carry the newly inserted values for each column.
    Example: define stream StockStream (symbol string, price float);
  • Update: The incoming event will contain attributes with each column name along with attributes with each column name prefixed with ‘before_'. The attributes with the column name carry the updated values of each column while the attributes with 'before_' prefix ' carries the values before the update.
    Example: define stream StockStream (symbol string, price float, before_symbol string, before_price float);
  • Delete: The incoming event will contain attributes with each column name prefixed with ‘before_'. These attributes carry the column values of the deleted record.
    Example: define stream StockStream (before_symbol string, before_ price float);

The advantages of this mode against the polling mode are,

  • Real-time
  • Can detect insert, update, and delete operations

To implement CDC with WSO2 Streaming Integrator, there are few configurations needs to be done at the database and the Streaming Integrator itself. The configuration to be done in the database include enabling CDC on a particular database, schema, and the table. The Streaming Integrator needs to be configured with relevant database drivers and other third-party libraries. Once the database and the Streaming Integrator are configured the Siddhi application needs to be written to receive events from CDC.

What’s next?

The next part of this article will guide you through the steps to configure CDC with Oracle 12cR2 database.

References

--

--