Using BigQuery SQL MERGE

Mark Scannell
Google Cloud - Community
4 min readDec 4, 2018

BigQuery can do some awesomely complex data processing, but often times the best features are hidden deep down in the documentation.

In this blog most we will be converting and merging incoming data with some sorted data. We will build the SQL step-by-step and use some of these features. This can help show you just what is possible!

Want to run it yourself? See “Testing it out” at the end!

The Challenge

We store our staging transaction table with this structure:

STRUCT<
id STRING, -- globally unique
version TIMESTAMP, -- transaction version, greatest is current
... other fields (eg, customer name, product name, order id)
>

The transactions are usually unique (only one version), but sometimes there is more than one version. For our analysis, we want to analyse the latest version of each transaction in the normal case. We don’t care all of the data fields on a transaction — you’ll see how the SQL works regardless!

We store our analytical transaction table with this structure:

STRUCT<
id STRING,
-- transaction with most recent version
latest STRUCT<version TIMESTAMP, ... other fields>,
-- older transactions (if any) sorted newest to oldest
history ARRAY<STRUCT<version TIMESTAMP, ... other fields>>
>

This gives us a convenient way to analyse the latest of each transaction, while still using the BigQuery array tools if we need the history.

The staging data is in the transactions.staging_data table and the analytical table is in transactions.data. We will construct a BigQuery SQL to MERGE staging_data table into data table. This SQL can run multiple times without impact.

The SQL

The SQL will be written as if it is going to separate tables, but it can be brought into a single SQL with WITH at the end.

GroupedStagingTransactions

Use ARRAY_AGG to aggregate the staging transactions according to id. Note using “data” to refer to the entire row as a STRUCT.

SELECT
id,
ARRAY_AGG(data) AS trans
FROM
transactions.staging_data data
GROUP BY
1

GroupedTransactions

Use SELECT modifier EXCEPT and AS STRUCT, the ARRAY function, and a expression subquery to remove the ID column.

SELECT
id,
ARRAY(SELECT AS STRUCT
* EXCEPT (id)
FROM
d.trans
) AS trans
FROM
GroupedStagingTransactions d

JoinedTransactions

Use ARRAY_CONCAT and an array construction to join with existing data.

SELECT
staging.id,
IF(data IS NULL,
staging.trans,
ARRAY_CONCAT(staging.trans, [data.latest], data.history)
) AS trans
FROM
GroupedTransactions AS staging
LEFT JOIN transactions.data AS data
ON staging.id = data.id

SortedTransactions

Use more expression subqueries and a SELECT DISTINCT and SELECT AS STRUCT to extract an array of distinct history records.

Observe the subquery-in-a-subquery and the output the same as the analytics table.

SELECT
t.id,
(SELECT
rec
FROM
t.trans AS rec
ORDER BY
version DESC
LIMIT 1
) AS latest,
ARRAY(SELECT DISTINCT AS STRUCT
*
FROM
T.trans
WHERE
version < (SELECT MAX(version) FROM t.trans)
ORDER BY
version DESC
) AS history
FROM
JoinedTransactions t

Merge the Data

Update or insert the data into the main table. Use the top-level structure values to ease updating/inserting the values.

MERGE transactions.data
USING SortedTransactions staging
ON
staging.id = data.id
WHEN MATCHED THEN
UPDATE SET
latest = staging.latest,
history = staging.history
WHEN NOT MATCHED BY TARGET THEN
INSERT (
id,
latest,
history)
VALUES (
staging.id,
staging.latest,
staging.history)

Single SQL Statement

Bring all of the SQL together into a single statement using WITH.

MERGE transactions.data
USING (
WITH
GroupedStagingTransactions AS (
SELECT
id,
ARRAY_AGG(data) AS trans
FROM
transactions.staging_data data
GROUP BY
1
),
GroupedTransactions AS (
SELECT
id,
ARRAY(SELECT AS STRUCT
* EXCEPT (id)
FROM
d.trans
) AS trans
FROM
GroupedStagingTransactions d
),
JoinedTransactions AS (
SELECT
staging.id,
IF(data.id IS NULL,
staging.trans,
ARRAY_CONCAT(staging.trans, [data.latest], data.history)
) AS trans
FROM
GroupedTransactions AS staging
LEFT JOIN transactions.data AS data
ON staging.id = data.id
)
SELECT
t.id,
(SELECT
rec
FROM
t.trans AS rec
ORDER BY
version DESC
LIMIT 1
) AS latest,
ARRAY(SELECT DISTINCT AS STRUCT
*
FROM
T.trans
WHERE
version < (SELECT MAX(version) FROM t.trans)
ORDER BY
version DESC
) AS history
FROM
JoinedTransactions t
) AS staging
ON
staging.id = data.id
WHEN MATCHED THEN
UPDATE SET
latest = staging.latest,
history = staging.history
WHEN NOT MATCHED BY TARGET THEN
INSERT (
id,
latest,
history)
VALUES (
staging.id,
staging.latest,
staging.history)

Testing it out

Create the transactions.staging_data table with some generated data. Use GENERATE_ARRAY, UNNEST, and RAND to help.

CREATE TEMP FUNCTION GetNewTransaction(id STRING)
AS (STRUCT(id,CURRENT_TIMESTAMP() AS version,
TRUE AS active,
CONCAT('customer-', CAST(FLOOR(RAND()*100) AS STRING))
AS customer_name,
CONCAT('product', CAST(FLOOR(RAND()*100) AS STRING))
AS product_name,
CAST(FLOOR(RAND() * 10000)/100.0 AS NUMERIC)
AS amount
));
CREATE OR REPLACE TABLE transactions.staging_data AS
SELECT
GetNewTransaction(CAST(r AS STRING)).*
FROM
UNNEST(GENERATE_ARRAY(1, 100000)) AS r;

Create the transactions.data table (empty) from the staging_data schema. LIMIT 0 is a great way to do this!

CREATE OR REPLACE TABLE transactions.data AS
WITH TransactionWithoutKey AS (
SELECT
* EXCEPT (id)
FROM
transactions.staging_data
LIMIT 0
)
SELECT
staging_data.id,
t AS latest,
[t] AS history
FROM
transactions.staging_data
CROSS JOIN TransactionWithoutKey t
LIMIT
0;

Final Thoughts

There are several things that can be extended —

  • Partitions. As the data size increases performance will decrease. Using date or clustered partitions can help.
  • Deletions. There is no mechanism to support deletions.

I hope this gives you an idea of some of the powerful techniques available in BigQuery.

--

--