Using BigQuery (and BigQuery ML) from Kubeflow Pipelines

Use the Python function to component capability

These days, when someone asks me the best way to set up a machine learning development to operationalization workflow, I point them to Kubeflow Pipelines (KFP). On Google Cloud, Cloud AI Platform Pipelines provides a managed experience for KFP, so that you don’t have to muck around with Kubernetes clusters.

Python code to invoke BigQuery

We start by writing a function that uses the BigQuery Python client to run a BigQuery query that creates a table and returns the table or model name:

from typing import NamedTupledef run_bigquery_ddl(project_id: str, query_string: str, 
location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
"""
Runs BigQuery query and returns a table/model name
"""
print(query_string)

from google.cloud import bigquery
from google.api_core.future import polling
from google.cloud import bigquery
from google.cloud.bigquery import retry as bq_retry

bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY

while job.running():
from time import sleep
sleep(0.1)
print('Running ...')

tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))

from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)

Creating a container using function to container

In Kubeflow Pipelines, each step (or “operation”) needs to be a container. Fortunately, taking the Python function above and making it a container is as easy as:

ddlop = comp.func_to_container_op(run_bigquery_ddl,
packages_to_install=['google-cloud-bigquery'])

Notice that I am specifying the Python packages that the function depends on. Also look carefully at the Python function itself — any non-standard packages are imported inside the function definition.

Using the container

Given the container above (ddlop), we can use it to execute any table or model creation query we want. For example, here’s a query to train a model, invoked as part of a pipeline:

def train_classification_model(ddlop, project_id):
query = """
CREATE OR REPLACE MODEL mlpatterns.classify_trips
TRANSFORM(
trip_type,
EXTRACT (HOUR FROM start_date) AS start_hour,
EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
start_station_name,
subscriber_type,
ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
member_gender
)
OPTIONS(model_type='logistic_reg',
auto_class_weights=True,
input_label_cols=['trip_type']) AS
SELECT
start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
"""
print(query)
return ddlop(project_id, query, 'US')

Writing a pipeline

We can string together such queries into an ML pipeline:

@dsl.pipeline(
name='Cascade pipeline on SF bikeshare',
description='Cascade pipeline on SF bikeshare'
)
def cascade_pipeline(
project_id = PROJECT_ID
):
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])

c1 = train_classification_model(ddlop, PROJECT_ID)
c1_model_name = c1.outputs['created_table']

c2a_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Typical')
c2b_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Long')

c3a_model = train_distance_model(ddlop, PROJECT_ID, c2a_input.outputs['created_table'], 'Typical')
c3b_model = train_distance_model(ddlop, PROJECT_ID, c2b_input.outputs['created_table'], 'Long')

evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas'])
error = evalop(PROJECT_ID, c1_model_name, c3a_model.outputs['created_table'], c3b_model.outputs['created_table'])
print(error.output)

Now, you can create a zip of this pipeline and submit it to the ML Pipelines cluster to invoke a new Experiment Run.

Try it out!

The full code for this article is on GitHub. To try out the notebook:

  • Create an instance of AI Platform Pipelines by following the Setting up AI Platform Pipelines how-to guide. Make sure to enable the access to https://www.googleapis.com/auth/cloud-platform when creating a GKE cluster.
  • Create a Notebook instance (any version) by going to AI Platform / Notebooks in the GCP console.
  • Clone my notebook (above)
  • Change the first cell to reflect the hostname of your KFP cluster.

Enjoy!

--

--

--

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Recommended from Medium

Should You Build a Custom Admin?

Design Project 3-week5

Using the Cloud for Social Good at TechTogether Boston’s 2021 Virtual Hackathon

Deploy and Scale your Dask Cluster with Kubernetes

How did Yogiyo Apply Kotlin?

Lambda Tech Weekly Report-09/07–09/11

25+ Best jQuery Weather Plugins & Tutorials with Demo

My vision about outside-in testing as a Backend Developer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Lak Lakshmanan

Lak Lakshmanan

Operating Executive at a technology investment firm; articles are personal observations and not investment advice.

More from Medium

Vertex AI Pipelines vs. Cloud Composer for Orchestration

Deployment Topologies for Data Fusion with Shared VPCs

Streaming Data to BigQuery with Dataflow and Updating the Schema in Real-Time

BigQuery ML models deployment with Vertex AI and Kubeflow