More Ways to Create Incremental Tables in Dataform

Alex Feldman
Google Cloud - Community
9 min readDec 5, 2023

If you use BigQuery to build complex queries and haven’t started using Dataform, it is worth starting to do so. In a nutshell, Dataform is almost like DBT but is better because it lives inside GCP. There are many valuable resources on how to get started with Dataform, including pretty good official documentation.

This article will discuss Dataform’s built-in capabilities for creating incremental tables. About what works well and what doesn’t work so well yet. And how we can improve the standard Dataform capabilities using some tricks.

Photo by Sid Balachandran on Unsplash

When working in BigQuery to create incremental tables, we often develop procedures that include DML statements such as INSERT, DELETE, UPDATE, and MERGE.

Dataform uses the config constructs with type = “incremental” to build incremental tables, as well as the ${when(incremental() ... condition ...)} operator to define the conditions.
For example:

config {
type: "incremental"
}

SELECT *
FROM $(ref('source_table')}
${when(incremental(),
`WHERE updated_at > (SELECT MAX(updated_at) FROM ${self()})`)}
}

Using such constructs, the Dataform generates the same CREATE/INSERT /MERGE statements that we would use in BigQuery, but with some restrictions (which we must overcome).

Let’s consider different options for configuring incremental tables in Dataform and look at their analogs in BigQuery.

Inserting new rows into the destination table

Let’s create a Dataform file with the path and name as definitions/explore_incremental/destination_table.sqlx:

config {
type: "incremental",
bigquery: {
partitionBy: "TIMESTAMP_TRUNC(updated_at, HOUR)"
},
schema: "destination_dataset",
tags: ["my_tag"]
}

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
${when(incremental(),
`WHERE updated_at > (SELECT MAX(updated_at) FROM ${self()})`
) }

In this example, we have the source_table that contains the updated_at field with the timestamp type (we should declare this data source before using the source_table. The result loads in the destination_table of the destination_dataset.

The script uploads all rows from the source table in the first subquery. In the next subqueries, we have some transformations. The last subquery extracts the MAX value of the updated_at field from the current version of the destination table and filters data by this value. We use the ${self()} JS operator to refer to the destination table.

In addition, by creating table partitions as TIMESTAMP_TRUNC(updated_at, HOUR), we can minimize the amount of data that needs to be scanned in the destination table.

The script generates the following BigQuery code.

INSERT INTO `my_project.destination_dataset.destination_table` (
WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
WHERE updated_at > (
SELECT MAX(updated_at)
FROM `my_project.destination_dataset.destination_table`
)
)

If we run it with the Run with full refresh execution options, the BigQuery code will look like this:

CREATE OR REPLACE TABLE `my_project.destination_dataset.destination_table` 
PARTITION BY TIMESTAMP_TRUNC(updated_at, HOUR)
AS (

WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
)

In regular incremental execution mode, the script generates the INSERT statement with WHERE cloud at the last subquery. As a result, only rows with a timestamp greater than the previously existing max timestamp have been inserted in the destination table.

In the full refresh mode, the script overwrites the existing destination table by all rows from the source table.

Uploading selected rows from the source table

If the source table is partitioned, using the WHERE clause in the first subquery efficiently minimizes the table scan.

config {
type: "incremental",
bigquery: {
partitionBy: "TIMESTAMP_TRUNC(updated_at, HOUR)"
},
schema: "destination_dataset",
tags: ["my_tag"]
}


pre_operations {
DECLARE last_timestamp TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT MAX(updated_at) FROM ${self()}`,
`SELECT TIMESTAMP('2023-01-01')`)}
);
}

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
WHERE updated_at > last_timestamp
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n

We can declare the last_timestamp variable in the pre_operations block and use it in the first subquery WHERE cloud. We can also declare the value for non-incremental (full refresh) mode (2023–01–01, for our example).

To run the action in the preview mode (by the Run button), we should comment on two rows to run it in the Dataform editor. But uncomment them if we execute it.

-- Comment rows for the preview RUN

-- pre_operations {
DECLARE last_timestamp TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT MAX(updated_at) FROM ${self()}`,
`SELECT TIMESTAMP('2023-01-01')`)}
);
-- }

Dataform generates the following BigQuery code in the incremental execution.

DECLARE last_timestamp TIMESTAMP DEFAULT (
SELECT MAX(created_at)
FROM `my_project.destination_dataset.destination_table`
);

INSERT INTO `my_project.destination_dataset.destination_table` (
WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
WHERE updated_at > last_timestamp
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
)

And for the full refresh mode.

DECLARE last_timestamp TIMESTAMP DEFAULT TIMESTAMP('2023-01-01');

CREATE OR REPLACE TABLE `my_project.destination_dataset.destination_table`
PARTITION BY TIMESTAMP_TRUNC(updated_at, HOUR)
AS (
WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
WHERE updated_at > last_timestamp
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
)

Truncate the destination table before the row inserting

Sometimes, we need to upload and process a portion of the data, replacing the destination table data every time. At first glance, it is simple. Just switch the “incremental” type of the script to the “table” type in the config block and use the condition of the selected row from the other place of the data pipeline. Something like:

config {
type: "table",
schema: "destination_dataset",
tags: ["my_tag"]
}

pre_operations {
DECLARE last_timestamp TIMESTAMP DEFAULT (
SELECT MAX(created_at) FROM ${resolve('table_downstream')} -- use 'resolve' instaed of 'ref'
);
}

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
WHERE updated_at > last_timestamp
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n

The ‘table_downstream’ is the table downstream of the pipeline, and the ${ref(‘table_downstream’)}option produces a circular error because the destination_table is a transitive dependence of the table_downstream. We can force it to work by replacing ${ref(‘table_downstream’)} with ${resolve(‘table_downstream’)}.

Another option is to use the incremental table with a truncating step.

config {
type: "incremental",
schema: "destination_dataset",
tags: ["my_tag"]
}

pre_operations {
DECLARE last_timestamp TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT MAX(updated_at) FROM ${self()}`,
`SELECT TIMESTAMP('2023-01-01')`)}
);
---
${when(incremental(),
`TRUNCATE TABLE ${self()}`)};
}

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
WHERE updated_at >= last_timestamp -- use '>='
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n

We added a TRUNCATE statement in the pre_operations block. The script first checks the last timestamp value and then truncates the table. We use >= instead of > in the WHERE updated_at >= last_timestamp expression to avoid an issue if the source_table has no new rows and the destination_table remains empty after the truncation. The destination table should have at least one row with the last_timestamp value. But if we ensure the source table always has new rows, we can use > in the condition.

BigQuery code for the regular incremental mode.

DECLARE last_timestamp TIMESTAMP DEFAULT (
SELECT MAX(created_at)
FROM `my_project.destination_dataset.destination_table`
);

TRUNCATE TABLE `my_project.destination_dataset.destination_table`;

INSERT INTO `my_project.destination_dataset.destination_table` (
WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
WHERE updated_at >= last_timestamp
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
)

And for the full refresh mode.

DECLARE last_timestamp TIMESTAMP DEFAULT TIMESTAMP('2023-01-01');

CREATE OR REPLACE TABLE `my_project.destination_dataset.destination_table` (
WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
WHERE updated_at >= last_timestamp
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
)

A disadvantage of this solution is duplicated rows in every session (if we use >=). We should handle such rows in the following steps.

If you want to avoid duplication of rows, the solution is to insert the row with only the updated_at value after the transition if there are no new rows in the source table.

config {
type: "incremental",
schema: "destination_dataset",
tags: ["my_tag"]
}

pre_operations {
DECLARE last_timestamp TIMESTAMP DEFAULT (
${when(incremental(),
`SELECT MAX(updated_at) FROM ${self()};`,
`SELECT TIMESTAMP('2023-01-01');`)}
)

CREATE TEMP TABLE stg_table AS
WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
WHERE updated_at > last_timestamp -- use '>' here
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n;

-- Truncate the table and insert an empty row
-- with the updated_at value if there are no new records.

${when(incremental(),
`TRUNCATE TABLE ${self()}
IF NOT EXISTS (SELECT * FROM stg_table) THEN
INSERT INTO stg_table (updated_at)
VALUES (last_timestamp)
`)
}

} -- end pre_operations

SELECT *
FROM stg_table

We created the temporary table for the main query with WHERE updated_at > last_timestamp. Then, we truncated the table and checked if the stg_table was empty (no new rows in the source table). If it was empty, we inserted a new row with only the updated_at field populated with the last_timastamp value. We must remember to filter this technical row in the next pipeline step.

An advantage of truncating the destination table before the row-inserting approach is that the script works in both regular incremental and full fresh modes. This is useful if we need to upload the source table frequently (every hour, for example) in the incremental mode and once a day in the full refresh mode to make some additional calculations and remove possible incremental uploading errors. We can set the execution mode when we schedule the workflow.

Merge rows in incremental table

Let's discuss merging rows, including updating existing rows and inserting new rows.

Dataform suggests us using the following construct for that.

config {
type: "incremental",
uniqueKey: ["id"],
bigquery: {
partitionBy: "DATE(updated_at)",
updatePartitionFilter:
"updated_at >= timestamp_sub(current_timestamp(), interval 24 hour)"
},
schema: "destination_dataset",
tags: ["my_tag"]
}

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n

The uniqueKey in the config block means that Dataform generates the MERGE statement to compile it.

The MERGE statement is an excellent feature but has some drawbacks. The first one is it’s resource-consuming. The second is it is pretty hard to optimize for wide tables with many columns (How to do it in BigQuery, including working with nested fields — you can read it in my article How to merge BigQuery tables with nested fields for incremental uploading). The third is that Dataform is not very flexible when configuring the MERGE. It only lets us configure the ON cloud by defining the list of unique columns with additional conditions from the updatePartitionFilter.

It generates the BigQuery code:

MERGE `my_project.destination_dataset.destination_table` T
USING (

WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n

) S
ON T.id = S.id
AND T.updated_at >= timestamp_sub(current_timestamp(), interval 24 hour)
WHEN MATCHED THEN
UPDATE SET T.column1 = S.column1 AND T.column2 = S.column2 AND ...
WHEN NOT MATCHED THEN
INSERT ROW;

We can use updatePartitionFilter if we know the period during which rows can be changed. If we don’t know this, we can omit the updatePartitionFilter and use only uniqueKey, but this leads to scanning the whole table and updating all existing rows.

BigQuery code for the full refresh mode will be the CREATE TABLE statement with the PARTITION BY DATE(updated_at) row.

It is worth using the MERGE statement in Dataform if we have the updatePartitionFilter or if resource consumption does not matter. For the last case, we can use a straightforward construct where all rows with the new IDs will be inserted, and all rows with existing IDs will be updated.

config {
type: "incremental",
uniqueKey: ["id"],
schema: "destination_dataset",
tags: ["my_tag"]
}

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
),

transformation_1 AS (
...
),
...
transformation_n AS (
...
)

SELECT *
FROM transformation_n

If resource consumption matters to us, and we strive to optimize our queries, let’s consider other ways to update rows.

Updating rows by deleting and inserting rows

We can combine INSERT and DELETE statements in our query with table partitions.

config {
type: "incremental",
bigquery: {
partitionBy: "RANGE_BUCKET("id", GENERATE_ARRAY(0, 100000, 100))"
},
schema: "destination_dataset",
tags: ["my_tag"]
}

pre_operations {
CREATE TEMP TABLE stg_table AS (

WITH
upload_source_table AS (
SELECT *
FROM $(ref('source_table')}
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
)};
---
${when(incremental(),
`DELETE ${self()} WHERE id IN (SELECT id FROM stg_table)`
)};
}

SELECT * FROM stg_table

To reduce table scanning, we create integer-range partitions by the ID columns. In the pre_operations block, we create the temp staging table. Then, we check existing IDs in the destination table and delete them. Finally, we insert all rows from the staging table into the destination table.

The BigQuery code for the incremental mode.

CREATE TEMP TABLE stg_table AS (

WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
),
transformation_1 AS (
...
),
...
transformation_n AS (
...
)
SELECT *
FROM transformation_n
);

DELETE `my_project.destination_dataset.destination_table`
WHERE id IN (SELECT id FROM stg_table);

INSERT INTO `my_project.destination_dataset.destination_table` (
SELECT *
FROM stg_table
);

The BigQuery code for the full refresh mode.

CREATE TEMP TABLE stg_table AS 
WITH
upload_source_table AS (
SELECT *
FROM `my_project.source_dataset.source_table`
),

transformation_1 AS (
...
),
...
transformation_n AS (
...
)

SELECT *
FROM transformation_n;


CREATE OR REPLACE TABLE `my_project.destination_dataset.destination_table`
PARTITION BY RANGE_BUCKET("id", GENERATE_ARRAY(0, 100000, 100))
AS (
SELECT *
FROM stg_table
);

In this article, we considered Dataform's ability to configure incremental tables. Although not all incremental scenarios work natively in Dataform, over time, we will see improvements in this area because the platform is actively evolving.

When incorporating incremental tables into data pipelines, it is important to minimize table scanning, not just the amount of data being loaded. This is where partitioning and clustering tables come into play.

Thank you for your attention.

--

--