Using BigQuery (and BigQuery ML) from Kubeflow Pipelines

Use the Python function to component capability

Lak Lakshmanan
Apr 21, 2020 · 3 min read

These days, when someone asks me the best way to set up a machine learning development to operationalization workflow, I point them to Kubeflow Pipelines (KFP). On Google Cloud, Cloud AI Platform Pipelines provides a managed experience for KFP, so that you don’t have to muck around with Kubernetes clusters.

Python code to invoke BigQuery

from typing import NamedTupledef run_bigquery_ddl(project_id: str, query_string: str, 
location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
"""
Runs BigQuery query and returns a table/model name
"""
print(query_string)

from google.cloud import bigquery
from google.api_core.future import polling
from google.cloud import bigquery
from google.cloud.bigquery import retry as bq_retry

bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY

while job.running():
from time import sleep
sleep(0.1)
print('Running ...')

tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))

from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)

Creating a container using function to container

ddlop = comp.func_to_container_op(run_bigquery_ddl,
packages_to_install=['google-cloud-bigquery'])

Notice that I am specifying the Python packages that the function depends on. Also look carefully at the Python function itself — any non-standard packages are imported inside the function definition.

Using the container

def train_classification_model(ddlop, project_id):
query = """
CREATE OR REPLACE MODEL mlpatterns.classify_trips
TRANSFORM(
trip_type,
EXTRACT (HOUR FROM start_date) AS start_hour,
EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
start_station_name,
subscriber_type,
ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
member_gender
)
OPTIONS(model_type='logistic_reg',
auto_class_weights=True,
input_label_cols=['trip_type']) AS
SELECT
start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
"""
print(query)
return ddlop(project_id, query, 'US')

Writing a pipeline

@dsl.pipeline(
name='Cascade pipeline on SF bikeshare',
description='Cascade pipeline on SF bikeshare'
)
def cascade_pipeline(
project_id = PROJECT_ID
):
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])

c1 = train_classification_model(ddlop, PROJECT_ID)
c1_model_name = c1.outputs['created_table']

c2a_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Typical')
c2b_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Long')

c3a_model = train_distance_model(ddlop, PROJECT_ID, c2a_input.outputs['created_table'], 'Typical')
c3b_model = train_distance_model(ddlop, PROJECT_ID, c2b_input.outputs['created_table'], 'Long')

evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas'])
error = evalop(PROJECT_ID, c1_model_name, c3a_model.outputs['created_table'], c3b_model.outputs['created_table'])
print(error.output)

Now, you can create a zip of this pipeline and submit it to the ML Pipelines cluster to invoke a new Experiment Run.

Try it out!

  • Create an instance of AI Platform Pipelines by following the Setting up AI Platform Pipelines how-to guide. Make sure to enable the access to https://www.googleapis.com/auth/cloud-platform when creating a GKE cluster.
  • Create a Notebook instance (any version) by going to AI Platform / Notebooks in the GCP console.
  • Clone my notebook (above)
  • Change the first cell to reflect the hostname of your KFP cluster.

Enjoy!

Google Cloud - Community

Google Cloud community articles and blogs

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Lak Lakshmanan

Written by

Data Analytics & AI @ Google Cloud

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store