Data pipelines with BigQuery routines

Alex Feldman
8 min readDec 31, 2022

--

Photo by Mahrous Houses on Unsplash

To create complex queries in Bigquery, we can use Bigquery routines such as table functions and stored procedures. Routines let us break the code into several blocks and add additional functionality and features.

This article will consider how to use routines and procedural language to build data pipelines. We will create a small project based on the Google public IMDB dataset. The project aims to make the top of the most popular new TV series.

The data pipeline will consist of three steps and three blocks of code.

  1. For the first step, we will calculate the list of the most popular TV series that started in 2022, had a comedy among genres, rated more than 8, and had more than 10000 reviews. The block code of this step will be stored in the proc_top_comedy_series stored procedure, and the result will be in the staging table stg_top_comedy_series.
  2. For the second step, we will define a list of the top 5 casts for every selected series. The block code of the step will be stored in the tfunc_top_cast table function. The result of executing the function will be transferred to the function calling place (the main stored procedure).
  3. The third step will be the main stored procedure, where we will join results to one destination table and add the test executing mode. The result table will be stored in the imbd_series dataset for the production mode and in the imbd_series_dpl dataset for the testing execution mode.

In the end, we will deploy the running of the main stored procedure by scheduled queries service.

Step one

So, let’s start writing the code for the first query:

SELECT DISTINCT a.*, b.primary_title, b.genres
FROM `bigquery-public-data.imdb.title_ratings` a
LEFT JOIN `bigquery-public-data.imdb.title_basics` b ON a.tconst = b.tconst
WHERE b.title_type IN ('tvSeries', 'tvMiniSeries') AND b.start_year = 2022
AND b.genres LIKE '%Comedy%'
AND a.average_rating >= 8 AND a.num_votes > 10000;
The first 5 rows of the query result.

Wednesday, you are the best!

This query’s result should be saved to the staging table in the imbd_series_dpl dataset.

Let’s add the CREATE TABLE statement:

-- Adding the create table statement
CREATE OR REPLACE TABLE `my-project.imdb_series_dpl.stg_top_comedy_series` AS


SELECT DISTINCT a.*, b.primary_title, b.genres
FROM `bigquery-public-data.imdb.title_ratings` a
LEFT JOIN `bigquery-public-data.imdb.title_basics` b ON a.tconst = b.tconst
WHERE b.title_type IN ('tvSeries', 'tvMiniSeries') AND b.start_year = 2022
AND b.genres LIKE '%Comedy%'
AND a.average_rating >= 8 AND a.num_votes > 10000;

And then, we wrap it up into a stored procedure:

-- Creating stored procedure
CREATE OR REPLACE PROCEDURE `my-project.imdb_series_dpl.proc_top_comedy_series` ()
BEGIN
-- The beginning of the procedure body.


CREATE OR REPLACE TABLE `my-project.imdb_series_dpl.stg_top_comedy_series` AS
SELECT DISTINCT a.*, b.primary_title, b.genres
FROM `bigquery-public-data.imdb.title_ratings` a
LEFT JOIN `bigquery-public-data.imdb.title_basics` b ON a.tconst = b.tconst
WHERE b.title_type IN ('tvSeries', 'tvMiniSeries') AND b.start_year = 2022
AND b.genres LIKE '%Comedy%'
AND a.average_rating >= 8 AND a.num_votes > 10000;

END; -- The end of the main procedure body.

When we run the query, it creates a stored procedure that we can see in the new routines folder in the working dataset.

To execute the procedure code, we need to call it using the CALL statement:

CALL `my-project.imdb_series_dpl.proc_top_comedy_series` ()

Ok, the first step of the data pipeline is ready.

Step two

The second step will include creating the table with the top cast per the series. We will make it with the table function.

First, let’s write the query:

WITH
tab1 as (
SELECT a.tconst, ARRAY_AGG(b.primary_name ORDER BY a.ordering LIMIT 5) as cast_array
FROM `bigquery-public-data.imdb.title_principals` a
LEFT JOIN `bigquery-public-data.imdb.name_basics` b ON a.nconst=b.nconst
WHERE category IN ('actor', 'actress')
AND tconst IN (SELECT tconst FROM `my-project.imdb_series_dpl.stg_top_comedy_series`)
GROUP BY 1),
tab2 as (
SELECT tconst, ARRAY_TO_STRING(cast_array, ', ') as top_cast
FROM tab1
)
SELECT *
FROM tab2
The result of the query.

The query creates the list of the top 5 actresses and actors by a series.

Let’s wrap up the query into the table function:

-- Adding the create table function statement
CREATE OR REPLACE TABLE FUNCTION `my-project.imdb_series_dpl.tfunc_top_cast`() as

WITH
tab1 as (
SELECT a.tconst, ARRAY_AGG(b.primary_name ORDER BY a.ordering LIMIT 5) as cast_array
FROM `bigquery-public-data.imdb.title_principals` a
LEFT JOIN `bigquery-public-data.imdb.name_basics` b ON a.nconst=b.nconst
WHERE category IN ('actor', 'actress')
AND tconst IN (SELECT tconst FROM `my-project.imdb_series_dpl.stg_top_comedy_series`)
GROUP BY 1),
tab2 as (
SELECT tconst, ARRAY_TO_STRING(cast_array, ', ') as top_cast
FROM tab1
)
SELECT *
FROM tab2

After executing this code, the tfunc_top_cast table function will be stored in the routines folder.

The second step has been done.

I recommend using a table function for pipeline steps whenever possible. A table function returns the result directly to the calling place and doesn’t demand any staging tables or views. But it has some limitations and is not suitable for some tasks. In terms of that, stored procedures are a more versatile tool.

Step three

Let’s do the last pipeline step — the main procedure.

The main procedure should launch the first and the second data pipeline stage and join the results in the one destination table.

Main stored procedure code:

-- Creating the main stored procedure
CREATE OR REPLACE PROCEDURE `my-project.imdb_series_dpl.main`()
BEGIN -- The beginning of the main procedure body.

-- The launch of the first step. The first step procedure stores the staging table as a result.
CALL `my-project.imdb_series_dpl.proc_top_comedy_series`();

-- Creating the destination table.
CREATE OR REPLACE TABLE `my-project.imdb_series.top_new_comedy_series_2022` AS
SELECT title, genres, top_cast, average_rating, num_votes

-- Reading of the staging table.
FROM `my-project.imdb_series_dpl.stg_top_comedy_series`
-- The launch of the second step. Execution of the table function and join the result.
LEFT JOIN `my-project.imdb_series_dpl.tfunc_top_cast`() USING(tconst)
ORDER BY average_rating DESC, num_votes DESC;


END; -- The end of the main procedure body.

We have already realized our data pipeline. It can already work. After running the main script and invoking the main procedure by the CALL statement, the navigation panel will look like this:

But if we need to develop a new version, it will be helpful to have a testing mode of the pipeline.

Let’s add a testing mode to the main script. The testing mode supposes that the result table will be written to the working dataset (imbd_series_dpl) instead of the production dataset (imbd_series).

For turning on the testing mode, we should change this code fragment from the imbd_series to the imbd_series_dpl.

-- Creating the main stored procedure
CREATE OR REPLACE PROCEDURE `my-project.imdb_series_dpl.main`()
BEGIN -- The beginning of the main procedure body.

-- The launch of the first step. The first step procedure stores the staging table as a result.
CALL `my-project.imdb_series_dpl.proc_top_comedy_series`();

-- Creating the destination table.
CREATE OR REPLACE TABLE `my-project.imdb_series.top_new_comedy_series_2022` AS
SELECT title, genres, top_cast, average_rating, num_votes
-- Reading of the staging table.
FROM `my-project.imdb_series_dpl.stg_top_comedy_series`
-- The launch of the second step. Execution of the table function and join the result.
LEFT JOIN `my-project.imdb_series_dpl.tfunc_top_cast`() USING(tconst)
ORDER BY average_rating DESC, num_votes DESC;

END; -- The end of the main procedure body.

First, let’s declare variables, set the current mode status, and define the working logic of the modes:

-- Define variables
DECLARE _testMode BOOL;
DECLARE _dataset STRING;

-- Set the current mode status. If it's True, the testing mode is on.
SET _testMode = TRUE;
-- Set logic conditions
IF _testMode = TRUE THEN
SET _dataset = 'imdb_series_dpl';
ELSE SET _dataset = 'imdb_series';
END IF;

Now to apply logic, we need to use the EXECUTE IMMEDIATE statement. This statement can handle SQL code as the text string. It lets us use string functions to process the code, including the CONCAT or ||.

Let’s input new code blocks into the main procedure:

CREATE OR REPLACE PROCEDURE `my-project.imdb_series_dpl.main`()
BEGIN
-- Define variables
DECLARE _testMode BOOL;
DECLARE _dataset STRING;

-- Set the current mode status. If it's True, the testing mode is on.
SET _testMode = TRUE;
-- Set logic conditions
IF _testMode = TRUE THEN
SET _dataset = 'imdb_series_dpl';
ELSE SET _dataset = 'imdb_series';
END IF;


CALL `my-project.imdb_series_dpl.proc_top_comedy_series`();

EXECUTE IMMEDIATE """
CREATE OR REPLACE TABLE `my-project."""||_dataset||""".top_new_comedy_series_2022` AS
SELECT title, genres, top_cast, average_rating, num_votes
FROM `my-project.imdb_series_dpl.stg_top_comedy_series`
LEFT JOIN `my-project.imdb_series_dpl.tfunc_top_cast`() USING(tconst)
ORDER BY average_rating DESC, num_votes DESC;
""";
END;

We can see that the _dataset variable was put into the SQL string instead of the dataset name, using a pair of || operators and triple quotes. The same triple quotes are at the SQL string’s beginning and end after the EXECUTE IMMEDIATE operator.

So, we provided the test mode for the main procedure. But If we’d want to provide it also for the first step procedure, we can pass the _dataset variable as a parameter of the called procedure and add the EXECUTE IMMEDIATE statement to the routine. For this case, the calling procedure statement will be looks like this:

CALL `my-project.imdb_series_dpl.proc_top_comedy_series`(_dataset);

Deploying

Finally, let’s deploy the data pipeline by the scheduled queries service. We need to write a new query with just one row:

-- Invoking the main procedure
CALL `my-project.imdb_series_dpl.main`();

We can also determine its execution mode when calling the main procedure: test or production. To do this, we can move the _testMode variable here.

DECLARE _testMode BOOL DEFAULT FALSE;
-- Invoking the main procedure
CALL `my-project.imdb_series_dpl.main`(_testMode);

In this case, we should remove the DECLARE _testMode BOOL; row from the main procedure code and define the procedure parameter in the first row as CREATE OR REPLACE PROCEDURE `my-project.imbd_series_dpl_main (_testMode BOOL)`

Let’s create a scheduled query with the menu SCHEDULE — Create a new scheduled query.

The destination table for query results is not required.

Such a way of deploying when we have only one invoking the main procedure row will give us lighter access to all query sequence codes without requiring us to leave the SQL workspace.

The result of the running of the data pipeline:

So, we created a data pipeline with three steps consisting of two stored procedures and the table function. The data pipeline can be run in two modes: the testing mode and the production mode. The scheduled queries service deployed the data pipeline.

--

--