Datastream and BigQuery — Append mode experiments
In this article we illustrate some recipes for processing Datastream append mode change tables.
An experiment was devised relating to Datastream APPEND mode. To recap this concept, we have a source database (Oracle in our test) that has changes occurring on it. Datastream is retrieving the log records from the database and, for each change (insert, update, delete), creating a change record that represents the change that is then appended into a BigQuery table.
We wrote a JMeter test plan that creates an arbitrary number of table changes in the Oracle database. The script has a few parameters:
- Number of changes (how many INSERTS, UPDATES and DELETES in total)
- Mix of INSERTS, UPDATES and DELETES expressed as percentages
Once run, we end up with an Oracle table of data (the source table) and an append table in BigQuery. We now have a test bed in which to test our reconciliation SQLs to validate that they end up with consistent data. This will include:
- Row counts
- Comparion (row by row)
In our tests, the Oracle table is called TABLE1 and has two columns:
- COL1 (primary key)
- COL2
We ran a test and performed 1000 operations. As hoped, we got exactly 1000 records in the change table.
Based on the above, we would expect the Oracle table to have 382–194 rows = 188. That is indeed exactly what Oracle SQL Developer shows.
When we contemplate our change table, we see that it contains 1000 records (as expected). In order for us to build a target table from the change data, it is essential that we know the order of updates to apply. For example, consider two updates:
Update 1: COL1=ABC, COL2=XYZ
Update 2: COL1=ABC, COL2=PQR
At the end of applying these updates, we would expect the final row to contain:
COL1=ABC,COL2=PQR
However, this would only be true if we applied the change table records in the order shown above. If we applied:
Update 1: COL1=ABC, COL2=PQR
Update 2: COL1=ABC, COL2=XYZ
Then the final value would be:
COL1=ABC,COL2=XYZ
It is thus vital that we apply the records in the correct order. When we look at the source_timestamp column we see that it has a granularity of only 1 second. This is not enough to compare just the timestamp values. Multiple operations on data can be completed within the same second and we would not be able to determine the correct order. This is where we need to use the change_sequence_number value. The belief is that if we have two Datastream written records then if we compare the change_sequence_number values, the one that is higher happened later. This then gives us our discriminator for ordering.
Next we apply our algorithm to creating a table from a change/append table.
WITH data AS (
SELECT ROW_NUMBER() OVER ( PARTITION BY COL1
ORDER BY datastream_metadata.change_sequence_number DESC) as _row, *
FROM `USER1.TABLE1`
)
SELECT * EXCEPT(_row)
FROM data
WHERE _row = 1 AND datastream_metadata.change_type != 'DELETE'
This creates a view and when we query the view, there are indeed the correct 188 rows returned. This is a good sign. Let us contemplate how this works. The following diagram will help:
The change table contains all the rows that were changed including whether they were inserts, updates or deletes. A sequence number identifies their relative order. What we do is bucket each of the rows in the table into groups partitioned by the COL1 value (the primary key) and then order them by highest sequence number. The row with the highest sequence number is, by definition, the last thing done in the source row. Since the change table contains the complete copy of the row it contains everything we need. We keep only the last change made for each row. If the last thing done was a delete operation, then we do not include that row in our view output.
Now we need to validate that this matches the same 188 rows in the Oracle table. We uploaded the CSV from an Oracle export and loaded this into BQ as a table. Next we used the following SQL to validate that they were the same:
SELECT V1.COL1, V1.COL2, OTAB.COL1, OTAB.COL2 FROM USER1_BQ.TABLE1 V1
LEFT OUTER JOIN USER1_BQ.O_TABLE1 AS OTAB
ON V1.COL1 = OTAB.COL1
WHERE V1.COL2 != OTAB.COL2
and the result … no discrepancies.
Our story assumes that we have the complete set of updates available to us that were performed on the source table. This history data is necessary in order to recreate the source. However, that may not always be possible. What if we have only just started collecting change data but have a copy of the source table (as a base) from the point where the changes were being collected. Fortunately, there is a solution to this. If we have the base table, we can contemplate that as a change table where each row in it has the lowest possible sequence number and corresponds to a CDC record of INSERT.
The last thing we will contemplate as it pertains to the data content is maintaining a base table without constantly re-creating it from the full history. We have so far seen that if we have a change table, containing all the historical data, we can recreate the source table content. Now let us assume that we want to maintain our data in this single table model and process new change records in periodic batches to create a new and more up to date version of the source table.
We can use the following diagram to illustrate our story. We start with a base table that is in synch with the source table as of time T1. Next, we accumulate change records in a change table from time T1 to T2. At this point, we perform some reconciliation operation that creates a new base table that is synchronized with the source as of time T2. From here, the story repeats since we have a new base table (as of T2) and collect changes from T2 to T3 and apply reconciliation again.
This then leaves the notion of how we apply the reconciliation. Let’s run an experiment. We will assume that we have a change table (CHANGES_ALL) that contains a lot of rows (changes). Let’s split this in two to create two tables (CHANGES_1 and CHANGES_2). We split the table midway such that CHANGES_1 contains change records from time T1 to T2 and CHANGES_2 contains change records from time T2 to T3.
We can do this by recognizing that a change table has a column called datastream_metadata.
change_sequence_number that is always increasing. If we query that table and order by the change_sequence_number then the midway position is 50% of our data.
The following will create CHANGES_1
CREATE OR REPLACE TABLE `USER1.CHANGES_1` AS
WITH DATA AS (
SELECT
ROW_NUMBER() OVER (ORDER BY datastream_metadata.change_sequence_number ASC) AS _row, *
FROM `USER1.CHANGES_ALL`
ORDER BY datastream_metadata.change_sequence_number ASC
)
SELECT * EXCEPT(_row) FROM DATA
WHERE _row <= DIV((SELECT COUNT(*) FROM data), 2 )
ORDER BY _row
and this will create CHANGES_2
CREATE OR REPLACE TABLE `USER1.CHANGES_2` AS
WITH DATA AS (
SELECT
ROW_NUMBER() OVER (ORDER BY datastream_metadata.change_sequence_number ASC) AS _row, *
FROM `USER1.CHANGES_ALL`
ORDER BY datastream_metadata.change_sequence_number ASC
)
SELECT * EXCEPT(_row) FROM DATA
WHERE _row > DIV((SELECT COUNT(*) FROM data), 2 )
ORDER BY _row
Now we will create BASE_1 which is the rollup of all the items in CHANGES_1
CREATE OR REPLACE TABLE `USER1.BASE_1` AS
WITH data AS (
SELECT ROW_NUMBER() OVER ( PARTITION BY COL1
ORDER BY datastream_metadata.change_sequence_number DESC) AS _row, *
FROM `USER1.CHANGES_1`
)
SELECT * EXCEPT(_row)
FROM data
WHERE _row = 1 AND datastream_metadata.change_type != 'DELETE'
and now we get to contemplate the new area … how will we update BASE_1 so that it contains the changes found in CHANGES_2? One solution is shown below:
MERGE `USER1.BASE_1` AS base
USING (
SELECT * EXCEPT(_row) FROM (
SELECT ROW_NUMBER() OVER (PARTITION BY COL1
ORDER BY datastream_metadata.change_sequence_number DESC
) AS _row, *
FROM `USER1.CHANGES_2`
) WHERE _row = 1
) AS delta
ON base.COL1 = delta.COL1
WHEN NOT MATCHED AND datastream_metadata.change_type IN ("INSERT", "UPDATE-INSERT") THEN
INSERT(COL1, COL2, datastream_metadata) VALUES (delta.COL1, delta.COL2, delta.datastream_metadata)
WHEN MATCHED AND delta.datastream_metadata.change_type = "DELETE" THEN
DELETE
WHEN MATCHED AND delta.datastream_metadata.change_type IN ("UPDATE-INSERT", "INSERT") AND
base.datastream_metadata.change_sequence_number < delta.datastream_metadata.change_sequence_number THEN
UPDATE SET COL2 = delta.COL2, datastream_metadata = delta.datastream_metadata
After execution, we will find that the base table now contains its original state merged with the changes from the change table. We should also note that the merge is idempotent. This means that if we run the merge twice and the change table hasn’t had new changes added to it, there will be no change to the base table. Only when new appends are made to the change table are updates made to the base table.