Dynamically Generating MERGE Queries with BigQuery’s Procedural Language

Vortex
Vortex Cloud
Published in
4 min readMay 29, 2024

BigQuery’s scripting language, a highly scalable data warehouse, offers powerful yet understated features. While many data professionals are familiar with its capabilities, fewer have tapped into the advanced tools it provides for automating repetitive tasks. This includes the ability to dynamically create and update queries on the fly, relieving you from the burden of manual maintenance.

Backstory

We utilized BigQuery as our data warehouse, with around 200 tables from various sources. Whenever new data was received, it was first stored in the landing zone. After that it was upserted to target tables using a MERGE query, stored and executed in Cloud Composer. These MERGE queries were static and hardcoded for each table in our target zone.

Each time a new table had to be synced, a new MERGE query had to be painstakingly crafted. Even a simple change like adding or removing a column required altering the existing query. Each change consumed about 2 hours of preparation, testing, and deployment time.

With 200 tables to sync and more expected, our current method was not scalable. However, our exploration of BigQuery’s procedural language reassured us that we could find a better, more scalable approach.

Utilizing FOR…IN loop to create queries dynamically

Asour tables are several hundred GBs in size and require refreshing every hour, we needed to keep the processing within BigQuery.

Realizing the limitations of our current approach, we turned to BigQuery’s procedural language for a solution. By populating all elements of the MERGE query from the BigQuery’s information schema, we could generate synchronization queries on the fly. This approach covered all scenarios, from column changes to the addition of new tables, making our syncing process more dynamic and adaptable.

Many loops are available in the BigQuery procedural language, but for our use case, the FOR…IN loop was perfect, as it allowed us to loop over all tables present in the target dataset and sync the data from staging dataset (landing zone).

To fully automate the process, our query had to do the following steps:

  1. Get a list of tables to sync and loop over each table
  2. Get a list of columns for the table
  3. Get the primary key (for joining) and partitioning key (for partition pruning)
  4. MERGE the staging and target table using all of the above

As was already mentioned, all of the above information is available in the INFORMATION_SCHEMA, and after the information is retrieved, the queries can easily be formatted and executed using BigQuery built-in functions.

Here’s how we put it into action with a FOR…IN loop:



DECLARE insert_columns, update_columns, pk_column, part_column STRING;
DECLARE date_arr ARRAY <DATE>;

FOR record IN (
SELECT
table_schema || '.' || table_name AS table_id
, 'staging_data' || '.' || table_name AS staging_table_id
, table_schema
, table_name
FROM main_data.INFORMATION_SCHEMA.TABLES
)
DO

-- read columns from schema
EXECUTE IMMEDIATE FORMAT('''
SELECT STRING_AGG(column_name), STRING_AGG('tgt.' || column_name || '=src.' || column_name)
FROM %s.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '%s'
''', record.table_schema, record.table_name) INTO insert_columns, update_columns;

-- get primary key from schema
EXECUTE IMMEDIATE FORMAT('''
SELECT column_name
FROM %s.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE
WHERE table_name = '%s'
AND constraint_name LIKE '%%pk$'
''', record.table_schema, record.table_name) INTO pk_column;

-- get partition key from schema
EXECUTE IMMEDIATE FORMAT('''
SELECT column_name
FROM %s.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '%s'
AND is_partitioning_column = 'YES'
''', record.table_schema, record.table_name) INTO part_column;

-- get array of dates to sync for partition pruning
EXECUTE IMMEDIATE FORMAT('SELECT ARRAY_AGG(DISTINCT DATE(%s)) FROM %s'
, part_column, record.staging_table_id) INTO date_arr;

-- construct and execute final MERGE statement
EXECUTE IMMEDIATE FORMAT('''
MERGE %s
USING %s src
ON src.%s = tgt.%s
AND DATE(tgt.%s) IN UNNEST(date_arr)
WHEN MATCHED THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);
''', record.table_id, record.staging_table_id, pk_column, record.updated_column, pk_column, pk_column, part_column, update_columns,
insert_columns, insert_columns);


END FOR;

Debugging and costs

Debugging or optimizing the queries is straightforward with BigQuery’s procedural language. Each query in each loop iteration is logged as a separate child job of the parent for loop job, allowing you to easily track billed bytes or potential errors for every part of the process. This transparency instils confidence in the reliability of the solution.

However, there is a slight increase in costs associated with this approach since BigQuery bills you at least 10MB for any query against the INFORMATION_SCHEMA (https://cloud.google.com/bigquery/docs/information-schema-intro#pricing). However, we determined that this cost increase is negligible, especially compared to the engineer’s time.

Conclusion

By leveraging BigQuery’s procedural language, we’ve significantly streamlined our data management, making it more efficient and less reliant on manual intervention. This has enabled our engineers to focus on more important tasks without impacting the debugging process of the queries. Moreover, the cost has remained unchanged and imperceptible.

--

--