Change Data Capture using Snowflake Streams:

A Snowflake Stream object basically tracks all DML changes made to rows in a source table and stores the metadata of each change. This metadata between two transactional points of time in a table is used later in order to retrieve the changed data.

Snowflake Streams capture an initial snapshot of all the rows present in the source table as the current version of the table with respect to an initial point in time. Streams then enable Change Data Capture every time you insert, update, or delete data in your source table. The Streams will have additional columns whenever any DML change is committed. So, by capturing the CDC Events you can easily merge just the changes from source to target using the MERGE statement.

How to Setup Snowflake Change Data Capture with Streams?

Step 1: We need a source and a destination table to operate upon. Run the following command to create
Database: SNOWFLAKE_SCD1
Schema: SCD1_DEMO
Source Table: TEST_SOURCE
Target Table: TEST_DESTINATION

CREATE OR REPLACE DATABASE SNOWFLAKE_SCD1;CREATE OR REPLACE SCHEMA SCD1_DEMO;-- SOURCE TABLE
CREATE OR REPLACE TABLE TEST_SOURCE(
ID INT,
FIRSTNAME VARCHAR(100),
LASTNAME VARCHAR(100));
-- DESTINATION TABLE
CREATE OR REPLACE TABLE TEST_DESTINATION(
ID INT,
FIRSTNAME VARCHAR(100),
LASTNAME VARCHAR(100));
Tables present in the database (SHOW TABLES)

Step 2: Run the following command to create a data stream (SOURCE_STREAM) on top of the source table (TEST_SOURCE).

CREATE OR REPLACE STREAM SOURCE_STREAM
ON TABLE TEST_SOURCE;

SHOW STREAMS display all the streams present in the database.

SHOW STREAMS

Note: As long as there is no data management command to consume it, any changes to the source table with respect to data would be considered as INSERT and not UPDATE.

For the purpose of this demonstration, let’s add a couple of records to the TEST_SOURCE table. Run the following command to do so.

INSERT INTO TEST_SOURCE VALUES ('1','LEE','HARRITON');
INSERT INTO TEST_SOURCE VALUES ('2','SARAVANAN','MURUGESAN');
INSERT INTO TEST_SOURCE VALUES ('3','SHIVA SANKARI','RENUGOPAL');
SELECT * FROM TEST_SOURCE

Step 3: Let’s view the change log in the stream before proceeding.

As you can see, here are the records and metadata fields. There are 3 additional META columns (METADATA$ROW_ID, METADATA$ISUPDATE and METADATA$ACTION) introduced and they make it very easy to detect if a row has been updated, deleted, or inserted. And, since it’s an initial load everything is inserted.

METADATA$ACTION — Specifies the action (INSERT or DELETE).

METADATA$ISUPDATE — Specifies whether the action recorded (INSERT or DELETE) is part of an UPDATE applied to the rows in the source table or view.

Note that streams record the differences between two offsets. If a row is added and then updated in the current offset, the delta change is a new row. The METADATA$ISUPDATE row records a FALSE value.

METADATA$ROW_ID — Specifies the unique and immutable ID for the row, which can be used to track changes to specific rows over time.

Now, there are no records in the destination table as they’re still in the stream.

Step 4: Let’s move the records to the destination using the MERGE statement.

MERGE INTO TEST_DESTINATION AS T
USING (SELECT *
FROM SOURCE_STREAM
WHERE NOT (METADATA$ACTION = 'DELETE' AND METADATA$ISUPDATE = TRUE)) AS S
ON T.ID = S.ID
WHEN MATCHED
AND S.METADATA$ACTION = 'INSERT'
AND S.METADATA$ISUPDATE THEN
UPDATE SET T.FIRSTNAME = S.FIRSTNAME,
T.LASTNAME = S.LASTNAME
WHEN MATCHED
AND S.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN NOT MATCHED
AND S.METADATA$ACTION = 'INSERT' THEN
INSERT (ID,
FIRSTNAME,
LASTNAME)
VALUES (S.ID, S.FIRSTNAME, S.LASTNAME)

Now, the data has moved to the destination and there will be nothing in the stream. After the MERGE command has consumed, the stream object will become empty.

The cycle continues, and the stream will continue to record all the changes that happen in the TEST_SOURCE table.

Step 5:

Let’s view the destination table now. Run the following command to do so.

As you can see, the destination table is updated with the records.

Step 6: Now, it’s time to demonstrate the Snowflake Change Data Capture. For that purpose, go ahead and make a change in the source table. Let’s update a record and observe the stream.

UPDATE TEST_SOURCE
SET LASTNAME = 'WASHINGTON'
WHERE ID = 1;

Now we are updating lastname as WASHINGTON for ID = 1.

While observing the stream, you’ll find 2 records, INSERT and DELETE. This is because the original last_name (i.e. HARRITON) was deleted for id=1 and the new last_name (i.e. WASHINGTON) was updated.

Use the MERGE command to update the destination table as shown below.

Step 7: Run the following command to take a look at the destination table.

Step 8: Go ahead and make a change in the source table. Let’s delete a record and observe the stream.

DELETE FROM TEST_SOURCE WHERE ID = '2';

Now we are deleting the record where ID = 2.

While observing the stream, you’ll find 1 records for DELETE.

Use the MERGE command to update the destination table as shown below.

Run the following command to take a look at the destination table.

Here, you will be clearly able to observe the changes made to the source table updated in the destination table.

References:-

--

--