How to merge BigQuery tables with nested fields for incremental uploading

Alex Feldman
4 min readMay 3, 2023

--

Photo by Boba Jaglicic on Unsplash

BigQuery standard SQL has an amazing feature that lets us upload tables incrementally. This is the MERGE statement. It works fine on simple tables and conditions, but it is not obvious how we can use it for tables with many columns (100, for example), including nested fields. And also how we can provide real incremental uploading where the query adds new rows and updates existing rows only if they changed.

In this article, we will consider how to create the merge statement for incremental uploading, force the query to check conditions with nested fields, and how to provide the generation of the condition string with many fields.

Base MERGE statement query for incremental uploading

First, let's see what the merge query looks like for a more simple example without nested fields and only 4 columns (id INT64, user_id INT64, item STRING, code INT64):

MERGE `my-project.my_dataset.users_items` T
USING (
SELECT a.id, a.user_id, b.item, b.code,
FROM `my-project.my_dataset.users` a
LEFT JOIN `my-project.my_dataset.items` b
) S
ON T.id = S.id
WHEN MATCHED
AND (
T.user_id != S.user_id
OR T.item != S.item
OR T.code != S.code
)
THEN
UPDATE SET T.id = S.id, T.user_id = S.user_id,
T.item = S.item, T.code = S.code
WHEN NOT MATCHED THEN
INSERT ROW;

We have one unique field (id) that can not be updated. If the new id appears, the query adds a new row. If any columns are changed in the row with an existing id, the query updates this row. If the row is not changed, the query does nothing with it.

What if the table has the following schema:

  • id INT64,
  • user_id INT64,
  • item ARRAY<STRUCT <item_id INT64, name STRING>>,
  • code INT64?

As we can see, the item field is the nested repeated field. If we run our query with such table schema, we will get the message:

  • Query error: Inequality is not defined for arguments of type ARRAY<STRUCT <item_id INT64, name STRING>>

We can also try to use conditions like T.item.item_id != S.item.item_id but it also will not work because the query can not get access directly to fields inside of the STRUCT without UNNEST operator.

What can we do with it?

Handle nested fields in the MERGE statement

Since we don’t need full access to the nested fields but just a check that no internal fields have been changed, we can convert the entire nested fields to JSON string by TO_JSON_STRINGfunction and compare them to each other.

MERGE `my-project.my_dataset.users_items` T
USING (
SELECT a.id, a.user_id, b.item, b.code,
FROM `my-project.my_dataset.users` a
LEFT JOIN `my-project.my_dataset.items` b
) S
ON T.id = S.id
WHEN MATCHED
AND (
T.user_id != S.user_id
OR TO_JSON_STRING(T.item) != TO_JSON_STRING(S.item)
OR T.code != S.code
)
THEN
UPDATE SET T.id = S.id, T.user_id = S.user_id,
T.item = S.item, T.code = S.code
WHEN NOT MATCHED THEN
INSERT ROW;

It works for now.

Many columns in the MERGE statement

As a rule, we need to use incremental uploading when we have huge tables in length or width. If we have many columns in our table we have to write all these field names in query conditions clouds. In this case, our query can inflate to an incredible size. To avoid it we can generate the field list from the INFORMATION_SCHEMA and save it in variables.

DECLARE columns_array ARRAY<STRING>;
DECLARE columns_merge_when_condition STRING;
DECLARE columns_merge_update STRING;


SET columns_array = (
SELECT
ARRAY_AGG(DISTINCT column_name)
FROM `my-project.my_dataset.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'users_items'
);
SET columns_merge_when_condition = (
SELECT
STRING_AGG(
'TO_JSON_STRING(T.' || column ||
') != TO_JSON_STRING(S.' || column || ')', ' OR '
)
FROM UNNEST(columns_array) AS column
);

SET columns_merge_update = (
SELECT
STRING_AGG('T.' || column || " = S." || column)
FROM UNNEST(columns_array) AS column
);

As we can see, we wrapped up all fields by the TO_JSON_STRINGfunction because it works with any data type fields. Moreover, converting all data to the string type solves another issue with NULL values. The point is that if any of the fields are NULL, such rows will be excluded from the condition, and in this case, the incremental uploading will not work correctly. The TO_JSON_STRINGfunction transforms NULL values to the ‘null’ text.

Let's compose the MERGE statement with the received variables.


EXECUTE IMMEDIATE
"""
MERGE `my-project.my_dataset.users_items` T
USING (
SELECT a.id, a.user_id, b.item, b.qnt,
FROM `my-project.my_dataset.users` a
LEFT JOIN `my-project.my_dataset.items` b
) S
ON T.id = S.id
WHEN MATCHED
AND (""" || columns_merge_when_condition || """)
THEN
UPDATE SET""" || columns_merge_update || """
WHEN NOT MATCHED THEN
INSERT ROW;
"""

Thus, using the MERGE statement, we created the query that incrementally updates the destination table. The query adds the row with new ids and updates rows with existing ids if any of the fields have been changed. The query lets us use nested fields and many columns thanking dynamically generation the list of columns. The final code looks like this:

DECLARE columns_array ARRAY<STRING>;
DECLARE columns_merge_when_condition STRING;
DECLARE columns_merge_update STRING;

SET columns_array = (
SELECT
ARRAY_AGG(DISTINCT column_name)
FROM `my-project.my_dataset.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'users_items'
);
SET columns_merge_when_condition = (
SELECT
STRING_AGG(
'TO_JSON_STRING(T.' || column ||
') != TO_JSON_STRING(S.' || column || ')', ' OR '
)
FROM UNNEST(columns_array) AS column
);

SET columns_merge_update = (
SELECT
STRING_AGG('T.' || column || " = S." || column)
FROM UNNEST(columns_array) AS column
);

EXECUTE IMMEDIATE
"""
MERGE `my-project.my_dataset.users_items` T
USING (
SELECT a.id, a.user_id, b.item, b.qnt,
FROM `my-project.my_dataset.users` a
LEFT JOIN `my-project.my_dataset.items` b
) S
ON T.id = S.id
WHEN MATCHED
AND (""" || columns_merge_when_condition || """)
THEN
UPDATE SET""" || columns_merge_update || """
WHEN NOT MATCHED THEN
INSERT ROW;
"""

--

--