Using BigQuery (and BigQuery ML) from Kubeflow Pipelines
Use the Python function to component capability
These days, when someone asks me the best way to set up a machine learning development to operationalization workflow, I point them to Kubeflow Pipelines (KFP). On Google Cloud, Cloud AI Platform Pipelines provides a managed experience for KFP, so that you don’t have to muck around with Kubernetes clusters.
Python code to invoke BigQuery
We start by writing a function that uses the BigQuery Python client to run a BigQuery query that creates a table and returns the table or model name:
from typing import NamedTupledef run_bigquery_ddl(project_id: str, query_string: str,
location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
"""
Runs BigQuery query and returns a table/model name
"""
print(query_string)
from google.cloud import bigquery
from google.api_core.future import polling
from google.cloud import bigquery
from google.cloud.bigquery import retry as bq_retry
bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY
while job.running():
from time import sleep
sleep(0.1)
print('Running ...')
tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))
from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)
Creating a container using function to container
In Kubeflow Pipelines, each step (or “operation”) needs to be a container. Fortunately, taking the Python function above and making it a container is as easy as:
ddlop = comp.func_to_container_op(run_bigquery_ddl,
packages_to_install=['google-cloud-bigquery'])
Notice that I am specifying the Python packages that the function depends on. Also look carefully at the Python function itself — any non-standard packages are imported inside the function definition.
Using the container
Given the container above (ddlop), we can use it to execute any table or model creation query we want. For example, here’s a query to train a model, invoked as part of a pipeline:
def train_classification_model(ddlop, project_id):
query = """
CREATE OR REPLACE MODEL mlpatterns.classify_trips
TRANSFORM(
trip_type,
EXTRACT (HOUR FROM start_date) AS start_hour,
EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
start_station_name,
subscriber_type,
ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
member_gender
)
OPTIONS(model_type='logistic_reg',
auto_class_weights=True,
input_label_cols=['trip_type']) ASSELECT
start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
"""
print(query)
return ddlop(project_id, query, 'US')
Writing a pipeline
We can string together such queries into an ML pipeline:
@dsl.pipeline(
name='Cascade pipeline on SF bikeshare',
description='Cascade pipeline on SF bikeshare'
)
def cascade_pipeline(
project_id = PROJECT_ID
):
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
c1 = train_classification_model(ddlop, PROJECT_ID)
c1_model_name = c1.outputs['created_table']
c2a_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Typical')
c2b_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Long')
c3a_model = train_distance_model(ddlop, PROJECT_ID, c2a_input.outputs['created_table'], 'Typical')
c3b_model = train_distance_model(ddlop, PROJECT_ID, c2b_input.outputs['created_table'], 'Long')
evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas'])
error = evalop(PROJECT_ID, c1_model_name, c3a_model.outputs['created_table'], c3b_model.outputs['created_table'])
print(error.output)
Now, you can create a zip of this pipeline and submit it to the ML Pipelines cluster to invoke a new Experiment Run.
Try it out!
The full code for this article is on GitHub. To try out the notebook:
- Create an instance of AI Platform Pipelines by following the Setting up AI Platform Pipelines how-to guide. Make sure to enable the access to https://www.googleapis.com/auth/cloud-platform when creating a GKE cluster.
- Create a Notebook instance (any version) by going to AI Platform / Notebooks in the GCP console.
- Clone my notebook (above)
- Change the first cell to reflect the hostname of your KFP cluster.
Enjoy!