Every database event is important, don’t let them rot away in an old batch, forgotten to the ravages of time and irrelevance. Let’s capture all that data.
Since we are out of the office and working remote, I need our relational database records to follow us and be sent offsite. Our physical tables may be empty, but our database ones are not. Let’s get that data streaming and useful.
CDC (Change Data Capture, not Center for Disease Control and not Cat Data Capture) is well defined in wikipedia and in this article:
- https://www.confluent.io/learn/change-data-capture/
- https://en.wikipedia.org/wiki/Change_data_capture
Sometimes you don’t need pure change data capture, sometimes you can just get data when an ID or date increments. You can do that really easily at scale (including grabbing every table in a database) with Apache NiFi:
- https://community.cloudera.com/t5/Community-Articles/Incrementally-Streaming-RDBMS-Data-to-Your-Hadoop-DataLake/ta-p/247927
- https://community.cloudera.com/t5/Community-Articles/Ingesting-RDBMS-Data-As-New-Tables-Arrive-Automagically-into/ta-p/246214
- https://community.cloudera.com/t5/Community-Articles/Incremental-Fetch-in-NiFi-with-QueryDatabaseTable/ta-p/247073
If you have MySQL/MariaDB simple CDC, then you can do it with Apache NiFi:
- https://community.cloudera.com/t5/Community-Articles/Simple-Change-Data-Capture-CDC-with-SQL-Selects-via-Apache/ta-p/308390
- https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-1-of-3/ta-p/246623
- https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-2-of-3/ta-p/246519
- https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-3-of-3/ta-p/246482
For real CDC, the best open source option is using Debezium along with Kafka and Kafka Connect. CDC Debezium KConnectors for PostgreSQL, MySQL, SQL Server, DB2, and Oracle.
Some other options are:
- https://github.com/rueian/pgcapture
- https://maxwells-daemon.io/
- https://docs.airbyte.com/understanding-airbyte/cdc/
- https://airbyte.com/blog/change-data-capture-definition-methods-and-benefits
There has been much discussion on this topic as you can see if my Twitter thread.
Source Code
Step Through Video
Change Data Capture with Apache Flink
The piece that is not going anywhere is Debezium, it is the easiest and most solid of the open source options for handlings CDC from a number of difererent databases including some closed source. There are also other options for MySQL like Maxwell and Canal. For most use cases in the open source, Debezium is the way to go. It is supported by a number of projects, companies and frameworks.
My first example is capturing all inserts that occur to my PostgreSQL database. In my use case, NiFi happens to be the app that is inserting data into my database. For those of you trying to build your own apps with minimal cost or as a developer, using the Cloudera CSP-CE includes a dockerized version of Apache Flink, Apache Kafka and PostgreSQL. This makes for a great way to build these apps with no cloud expense.
Data Flow
Apache NiFi insert data into PostgreSQL via PutDatabaseRecord
Our Source Table in PostgreSQL:
CREATE TABLE newjerseybus
(
title VARCHAR(255),
description VARCHAR(255),
link VARCHAR(255),
guid VARCHAR(255),
advisoryAlert VARCHAR(255),
pubDate VARCHAR(255),
ts VARCHAR(255),
companyname VARCHAR(255),
uuid VARCHAR(255),
servicename VARCHAR(255)
)
Create Flink Postgresql-CDC virtual table via SQL DDL.
CREATE TABLE `postgres_cdc_newjerseybus` (
`title` STRING,
`description` STRING,
`link` STRING,
`guid` STRING,
`advisoryAlert` STRING,
`pubDate` STRING,
`ts` STRING,
`companyname` STRING,
`uuid` STRING,
`servicename` STRING
) WITH (
'connector' = 'postgres-cdc',
'database-name' = '<Database Name>',
'hostname' = '<Host Name>',
'password' = '<Password>',
'decoding.plugin.name' = 'pgoutput',
'schema-name' = '<Schema Name>',
'table-name' = 'newjerseybus',
'username' = '<User Name>',
'port' = '5432'
);
Apache Flink uses Debezium connect to read data in Debezium JSON format.
Let’s then Query that CDC Table.
select * from postgres_cdc_newjerseybus
Another thing we can do is create a JDBC Sink as a virtual table in SQL Stream Builder and have Flink SQL populate it for us. I am thinking of joining all the transit sources together and having a sink populate it.
insert into postgres_jdbc_newjerseytransit
select * from postgres_cdc_newjerseybus
Our JDBC PostgreSQL Sink Table DDL in SSB.
CREATE TABLE postgres_jdbc_newjerseytransit (
`title` VARCHAR(2147483647),
`description` VARCHAR(2147483647),
`link` VARCHAR(2147483647),
`guid` VARCHAR(2147483647),
`advisoryalert` VARCHAR(2147483647),
`pubdate` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`companyname` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647) ,
`servicename` VARCHAR(2147483647),
primary key (uuid) not enforced
) WITH (
'connector' = 'jdbc',
'table-name' = 'newjerseytransit',
'url' = 'jdbc:postgresql://<HOSTNAME>:5432/<DATABASENAME>',
'username' = '<username>',
'password' = '<password>'
)
We can check out PostgreSQL table with DBeaver to make sure our sink table is getting records.
We can also upsert records into Kafka from our Flink CDC table. First we create another virtual table as upsert-kafka and make sure we have it mapped to an existing Kafka topic.
CREATE TABLE `upsert_kafka_newjerseybus` (
`title` String,
`description` String,
`link` String,
`guid` String,
`advisoryAlert` String,
`pubDate` String,
`ts` String,
`companyname` String,
`uuid` String,
`servicename` String,
`eventTimestamp` TIMESTAMP(3),
WATERMARK FOR `eventTimestamp` AS `eventTimestamp` - INTERVAL '5' SECOND,
PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'kafka_newjerseybus',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
insert into upsert_kafka_newjerseybus
select `title`, `description`, `link`, guid, advisoryalert as advisoryAlert,
pubdate as pubDate, ts, companyname, uuid, servicename, LOCALTIMESTAMP as eventTimeStamp
from postgres_cdc_newjerseybus
insert into upsert_kafka_newjerseybus
select `title`, `description`, `link`, guid, advisoryalert as advisoryAlert,
pubdate as pubDate, ts, companyname, uuid, servicename, LOCALTIMESTAMP as eventTimeStamp
from postgres_cdc_newjerseybus
In our insert, it’s important to match up field names and types, so I rename them in the SQL and for the field we don’t have I let Flink generate a timestamp.
FLaNK-CDC with Debezium, Apache Kafka and Kafka Connect
The other way to use Debezium via Kafka Connect is to use Apache Kafka from within Cloudera Streams Messaging Manager (SMM). This is the default way and is pretty easy to setup with some simple configuration.
Data Flow for Flink CDC
Use Cloudera Streams Messaging Manager (SMM) to easily configure.
Kafka Connect Source -> CLASS NAME: io.debezium.connector.postgresql.PostgresConnector
Uses pgoutput to consume from Postgresql database via Debezium.
Data is produced to Kafka Topic: tspann.public.newjerseybus
CDC is in Stream!
As shown below using REST, we can export the Kafka Connect configuration as JSON.
Check out Swagger/REST API!
Let’s run some API calls.
Now let’s use those CDC events. Debezium CDC Kafka Messages have arrived.
CDC/Debezium/Kafka Consumer
We will now read those Debezium CDC events with Apache NiFi and send those changed PostgreSQL table rows to Oracle.
- NiFi consumes from Kafka Topic: tspann.public.newjerseybus
- Debezium JSON events are parsed by Apache NiFi.
- NiFi sends after record to ForkEnrichment.
- NiFi sends plain after record as inserts to Oracle 23 database/schema/table: FREEPDB1.TSPANN.NEWJERSEYBUS
- Debezium Meta Data attributes are joined with after records to build annotated JSON record.
- NiFi sends this enhanced JSON event to the Kafka Topic: ${sourcetable}-cdc ie. newjerseybus-cdc.
Consume from the Apache Kafka Topic
Set the schema name for schema registry lookups.
Let’s split it out to one record at a time.
Let’s pull out the Debezium JSON Event fields including the after record which is the full current record for the table. The other fields describe the source attributes.
We pull out the after record (which will be the full current table record as JSON).
Let convert and filter via QueryRecord. We need to add a filter, this is a placeholder for now. It’s just SELECT *.
Let’s filter out empty records with routing (RouteOnAtribute).
In the remainder of the NiFi flow you can see which fork the record into two pieces. One piece goes raw to Oracle and the other is enhanced with extra fields.
Insert records from CDC into Oracle, automagically.
Our table design to receive Postgresql records into Oracle 23. When I started work, my first Oracle version was 5.
Rows have landed in our table.
Build New JSON Record: After the Fork Enrichment, Add Debezium Fields
After new JSON enhancement, let’s join those two records together automagically
Let’s stream this new joined record to our Kafka cluster.
The Final Kafka Message Produced From our New Fields
For development, use the free dockerized Oracle: https://hub.docker.com/r/gvenzl/oracle-free
Wrap-Up
As you can see Debezium is a very powerful way to stream change events from databases like PostgreSQL and use them for whatever purpose you stream fit. You can choose Kafka Connect and Kafka to receive your Debezium events or have Flink do it directly. Kafka Connect as support for more connectors, so that may be the go to choice for other connectors.
I will next start streaming events from Oracle, DB2, MariaDB/MySQL, SQL Server, MongoDB and others.
Tips
When you are searching for CDC, it’s probably best to search for “Change Data Capture”, as CDC will bring up a lot of disease data. That’s a subject for another article and example.
Don’t get lost on the Internet, bring a cat.
References
- https://docs.cloudera.com/csa/1.10.0/how-to-ssb/topics/csa-ssb-cdc-connectors.html
- https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/
- https://debezium.io/documentation/reference/1.7/
- https://www.datainmotion.dev/2019/10/monitoring-cloudera-edge-flow-manager.html
Change Data Capture Lab
NiFi CDC and Pseudo-CDC / ETL / ELT
- https://community.cloudera.com/t5/Community-Articles/Incrementally-Streaming-RDBMS-Data-to-Your-Hadoop-DataLake/ta-p/247927
- https://community.cloudera.com/t5/Community-Articles/Ingesting-RDBMS-Data-As-New-Tables-Arrive-Automagically-into/ta-p/246214
- https://community.cloudera.com/t5/Community-Articles/Incremental-Fetch-in-NiFi-with-QueryDatabaseTable/ta-p/247073
- https://community.cloudera.com/t5/Community-Articles/Simple-Change-Data-Capture-CDC-with-SQL-Selects-via-Apache/ta-p/308390
- https://nikhil-suthar-bigdata.medium.com/cdc-with-apache-nifi-65020c748ff5
- https://www.cdc.gov/nbs/modernization/stories/202304.html
- https://github.com/apache/nifi/pull/6053/files