Bigquery Looping Optimization

Anas Aslam
Google Cloud - Community
4 min readFeb 5, 2023

--

Looping is a sequence of actions that is continually repeated until a certain condition is reached. In Bigquery, loop runs in a sequence, which means until the previous iteration is completed, the new iteration will not get triggered. Since the iteration happens sequentially, the overall execution time increases.

LOOP

In this article, i will cover about the steps to parallelize the iteration, provided there is no dependency between the iterations.

Example:

Lets take an example of an aggregate_table which has 5 columns:
dataset_id
table_id
filter_condition
aggregate_function
aggregate_value

The requirement is to calculate the aggregate_value field based on the aggregate_function and update the aggregate_value field in the aggregate_table.

Sample Data

Sample Data

For example,

In row 1: aggregate_value = select MIN(product_price) from poc_aa_cs.product_tgt table where launch_date = ‘1900–01–03’
In row2: aggregate = select MAX(product_price) from poc_aa_cs.product_tgt where launch_date = ‘1900–01–05’

Let’s use FOR loop to iterate over the rows.

Original Procedure:

The `aggregate_proc` procedure definiton with sequential execution as below:

CREATE OR REPLACE PROCEDURE
poc_aa_cs.aggregate_proc ()
BEGIN
DECLARE
aggregate_statement string; -- the aggregation statement
DECLARE
update_statement string; -- the update table statement
DECLARE
var_agg_value float64; -- the output of the aggregate function
FOR iterator IN (
SELECT
*
FROM
poc_aa_cs.tbl_aggregate ) DO
SET
aggregate_statement = "select "||iterator.aggregate_function||" from "||iterator.dataset_id||"."||iterator.table_id||" where "||iterator. filter_condition;
EXECUTE IMMEDIATE
(aggregate_statement) INTO var_agg_value; -- storing the aggregate output
SET
update_statement = "update poc_aa_cs.tbl_aggregate set aggregate_value=("||var_agg_value||") where dataset_id='"||iterator.dataset_id ||"' and table_id='"||iterator.table_id ||"' and aggregate_function='"||iterator.aggregate_function||"'";
EXECUTE IMMEDIATE
(update_statement); -- updating the table with aggregate value
END
FOR;
END
;
CALL
poc_aa_cs.aggregate_proc ();
Approach 1: Performance Results

The variable “aggregate_statement” contains the sql statement for aggregation calculation and the variable “update_statement” contains the sql statement to update the value of the aggregate_value field.

Here, the procedure is iterating over all the rows in the source table (tbl_aggregate) one by one, hence the overall execution time is 5mins 12 secs.

Optimized Procedure with Parallelism:

Now, lets see how this procedure can be changed, so it is compatible for parallel execution.

In order to do it, we will create five new variables

total_partition_count: Total number of parallel execution needed
partition_id: The partition to pick from the total_partition
v_record_count: Total record count from the source table
v_start_rownum: Starting row number from the source table
v_end_rownum: Ending row number from the source table

Here, we are going to split the execution based on the row_number of the table.

CREATE OR REPLACE PROCEDURE
poc_aa_cs.aggregate_proc_parallel (IN total_partition_count int64,
IN partition_id int64)
BEGIN
DECLARE
aggregate_statement string;
DECLARE
update_statement string;
DECLARE
var_agg_value float64;
DECLARE
v_record_count INT64;-- total record count from source table
DECLARE
v_start_rownum INT64;-- starting row number
DECLARE
v_end_rownum INT64;-- ending row number
SET
v_record_count=(
SELECT
COUNT(1)
FROM
poc_aa_cs.tbl_aggregate);
SET
v_start_rownum=((partition_id-1)*CAST(TRUNC((v_record_count/total_partition_count), 0) AS int64)) +1;
IF
(partition_id <> total_partition_count) THEN
SET
v_end_rownum=(partition_id*CAST(TRUNC((v_record_count/total_partition_count),0) AS INT64));
END IF
;
IF
(partition_id = total_partition_count) THEN
SET
v_end_rownum=(partition_id*CAST(TRUNC((v_record_count/total_partition_count), 0) AS INT64))+(total_partition_count-1);
END IF
; CREATE TEMP TABLE aggregate_temp AS (
SELECT
dataset_id,
table_id,
aggregate_function,
filter_condition,
ROW_NUMBER() OVER (ORDER BY insert_time ASC) AS row_num
FROM
poc_aa_cs.tbl_aggregate QUALIFY row_num BETWEEN v_start_rownum
AND v_end_rownum); FOR iterator IN (
SELECT
*
FROM
aggregate_temp ) DO
SET
aggregate_statement = "select "||iterator.aggregate_function||" from "||iterator.dataset_id||"."||iterator.table_id||" where "||iterator.filter_condition;
EXECUTE IMMEDIATE
(aggregate_statement) INTO var_agg_value;
SET
update_statement = "update poc_aa_cs.tbl_aggregate set aggregate_value=("||var_agg_value||") where dataset_id='"||iterator.dataset_id ||"' and table_id='"||iterator.table_id ||"' and aggregate_function='"||iterator.aggregate_function||"'";
EXECUTE IMMEDIATE
(update_statement);
END
FOR;
END
;
CALL
poc_aa_cs.aggregate_proc_parallel (2,
1);
Optimized Procedure with Parallelism

Performance Results:

For example, you decided to run the procedure in 2 parallel execution.

Hence, you can invoke the procedure as below:

call poc_aa_cs.aggregate_proc_parallel (total_partiton_count, partition_id )
call poc_aa_cs.aggregate_proc_parallel (2,1);
call poc_aa_cs.aggregate_proc_parallel (2,2);

Assume that the total records in the table is 250, then
aggregate_proc_parallel (2,1) will process from row_number 1 to 125
aggregate_proc_parallel (2,2) will process from row_number 126 to 250

Approach 2: Performance Results

Conclusion:

By executing parallely [ aggregate_proc_parallel (2,1) and aggregate_proc_parallel (2,2) ], the total execution time to complete all the rows went down from 5min 12 sec to 2mins 42 sec.

Thus the overall process time is reduced due to parallization.

Remarks:

Here, i wanted to give you the generic approach to split the workload based on the row_number. However based on requirement, splitting logic can be anything. Example in this case, you can even split the workload based on each dataset, table etc and accordingly change the row splitting logic.

I frequently write about Google Cloud Technologies and Optimization Techniques. Feel free to follow me for future articles.

Appendix:

My other articles from Bigquery for your reference:

--

--