Using BigQuery (and BigQuery ML) from Kubeflow Pipelines

Use the Python function to component capability

Lak Lakshmanan
Google Cloud - Community


These days, when someone asks me the best way to set up a machine learning development to operationalization workflow, I point them to Kubeflow Pipelines (KFP). On Google Cloud, Cloud AI Platform Pipelines provides a managed experience for KFP, so that you don’t have to muck around with Kubernetes clusters.

Python code to invoke BigQuery

We start by writing a function that uses the BigQuery Python client to run a BigQuery query that creates a table and returns the table or model name:

from typing import NamedTupledef run_bigquery_ddl(project_id: str, query_string: str, 
location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
Runs BigQuery query and returns a table/model name

from import bigquery
from google.api_core.future import polling
from import bigquery
from import retry as bq_retry

bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY

while job.running():
from time import sleep
print('Running ...')

tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))

from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)

Creating a container using function to container

In Kubeflow Pipelines, each step (or “operation”) needs to be a container. Fortunately, taking the Python function above and making it a container is as easy as:

ddlop = comp.func_to_container_op(run_bigquery_ddl,

Notice that I am specifying the Python packages that the function depends on. Also look carefully at the Python function itself — any non-standard packages are imported inside the function definition.

Using the container

Given the container above (ddlop), we can use it to execute any table or model creation query we want. For example, here’s a query to train a model, invoked as part of a pipeline:

def train_classification_model(ddlop, project_id):
query = """
CREATE OR REPLACE MODEL mlpatterns.classify_trips
EXTRACT (HOUR FROM start_date) AS start_hour,
EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
input_label_cols=['trip_type']) AS
start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
return ddlop(project_id, query, 'US')

Writing a pipeline

We can string together such queries into an ML pipeline:

name='Cascade pipeline on SF bikeshare',
description='Cascade pipeline on SF bikeshare'
def cascade_pipeline(
project_id = PROJECT_ID
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])

c1 = train_classification_model(ddlop, PROJECT_ID)
c1_model_name = c1.outputs['created_table']

c2a_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Typical')
c2b_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Long')

c3a_model = train_distance_model(ddlop, PROJECT_ID, c2a_input.outputs['created_table'], 'Typical')
c3b_model = train_distance_model(ddlop, PROJECT_ID, c2b_input.outputs['created_table'], 'Long')

evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas'])
error = evalop(PROJECT_ID, c1_model_name, c3a_model.outputs['created_table'], c3b_model.outputs['created_table'])

Now, you can create a zip of this pipeline and submit it to the ML Pipelines cluster to invoke a new Experiment Run.

Try it out!

The full code for this article is on GitHub. To try out the notebook:

  • Create an instance of AI Platform Pipelines by following the Setting up AI Platform Pipelines how-to guide. Make sure to enable the access to when creating a GKE cluster.
  • Create a Notebook instance (any version) by going to AI Platform / Notebooks in the GCP console.
  • Clone my notebook (above)
  • Change the first cell to reflect the hostname of your KFP cluster.




Lak Lakshmanan
Google Cloud - Community

articles are personal observations and not investment advice.