ML Pipelines (Vertex) Part 1

Jesus
8 min readJul 7, 2022

--

This is my second attempt to bring stuff I’ve discovered.

Remember you can always skip the explanation and go to the code!

One of the key aspects for ML is that it is a process made out of different steps:

  1. Ingest, structured or unstructured data.
  2. Exploratory Data Analysis, this is where we realize if the data is optimal for ML, here we apply statistics and calculations.
  3. Transformation | Feature Engineering, i.e. handling missing values, grouping intervals, encoding data, splitting, handling outliers, removing biases, scaling, creating new features, etc.
  4. Training this is the part where all magic happens (Machine Learning Code!).
  5. Evaluation, in this section we get the metrics/results of the training to compare with the reality (how well performed is our model).
  6. Deployment, once we trained the model, we need an endpoint (a web server if you will) for consumption.
  7. Monitoring, we need to keep tracking of our data distribution to tune the model in the future (remember data is constantly changing).
  8. CI/CD/CT (MLOps), what the heck?, well don’t worry too much, it’s just a way to automate your steps and keep your ML model awesome (I’ll cover this final step later in another story).

These steps are relevant to build a real world Machine Learning, and this is the kind of stuff big companies like Amazon, Apple, Google, Meta, Microsoft, Netflix, Uber, etc.. use everyday to offer better deals. But how we orchestrate all the pieces?

Pipelines !

Although there are many ways, I will use the best: something based in Kubeflow. Why?, because it’s cool and easy, the product is called Vertex AI Pipelines which is a Google managed service (I don’t want to waste my time building VMs).

Now Vertex has different managed components to make your life easier, like:

  • Managed Datasets, creates some statistics to understand data distribution and help you splitting data for training.
  • Google AutoML, where we can train high quality models with our own data, Google takes care of doing transformations, cleanings, splittings, testings with different ML methods, analyzing the best hyper-parameters for neural networks, looking at the best neural model architecture, and deploying the model for us. (I’ll create another blog for this).
  • Custom Training, where we can train with our own custom model using our own data, there are 4 ways to do it and it depends on how easy or complex we want to go. More explained below.
  • Deployment/Inference, Once the training step is done we need a place to mount the model or an endpoint to be consumed.

Let’s take a moment to talk about “custom training” and the 4 ways/options to do it:

  1. Custom Lightweight Function Based Components.

This is for those who don’t want to use ML Google Managed Services and just take advantage of the “Managed Infrastructure” piece. Let me give you a recommendation. “Do test them all”, I always work under the “Do Something” principle, which is the infinitive loop between … -> Inspiration -> Motivation -> Action -> Inspiration -> …, “Action isn’t just the effect of motivation; it’s also the cause of it”, when you start doing something you get the confidence for the next part, “just do it”.

Getting back to the game, this option is the “Kubeflow (open)” way, all the components has inputs and outputs, they are used to chain other components. The rest of this post is about this option, I’ll create more for the other ways (stay tuned).

Why I chose this method for this publication?, because it’s the foundation.

2. Custom Container Training Job Run…

Although the name is awful, I like to call this way, “ML Google Managed” and it’s for doing custom models, if you need data statistics, data cleansing, feature engineering, hyper-parameter tuning, distributed learning, this is the way. (Coming soon…)

3. Custom Python Package Training Job Run…

The name is even worse. This option is the same as option 2, and the difference is; use this step if you don’t want to deal with all the previous job stuff like dockerization, packages installation, etc. A google certified container like this can be used instead. It’s freaking awesome isn’t it?

4. Custom Training Job Op…

If you don’t need to use the managed dataset component and their fantastic splitting and basic data transformations, use this method, it’s eforless to manage, cleaner, faster for booting, and easier to debug.

Enough!, what am I doing?, I won’t be “that guy” with a bunch of information.

First: Create some place to run your code

We will use Vertex Workbench (managed jupyter lab), but you can use your own, actually there is a docker version (run it in your terminal):

## Optional (skip it if workbench is used)gcloud auth application-default loginADC=/home/[YOUR_USERNAME]/.config/gcloud/application_default_credentials.jsondocker run \
-ti \
-p 8888:8888 \
-e GOOGLE_APPLICATION_CREDENTIALS=/tmp/keys/FILE_NAME.json \
-v ${ADC}:/tmp/keys/FILE_NAME.json \
-e GRANT_SUDO=yes \
--user root jupyter/base-notebook

Step 1: Vertex Workbench

  • Go to Google Cloud Console and create a new workbench, Left hamburger > Vertex AI > User Manage Notebooks > New Notebook > Python 3 > Create, and literally 45 seconds later you’ll have a new Jupyter Lab up and running with a proxy server aside to login in the best secured way possible. How amazing is that?
Vertex Workbench
  • Now step on jupyerlab and create your first notebook:
Workbench Jupyter Lab

Step 2: Pre-work (package installation and variables)

Use the notebook to run all the following steps.

!pip install kfp --upgrade --user

You may need to restart the kernel, in jupyterlab go to kernel > restart kernel.

Set your variables:

In this post we’ll only use a bucket to store our artifacts, so we define gs://vtx-root-path, (use your own Google Cloud Storage bucket). How?

PIPELINE_ROOT_PATH = 'gs://vtx-root-path'  # change this string.

Import libraries:

from typing import Dict
from kfp.v2 import compiler
from kfp.v2.dsl import (pipeline, component, Input, Output, OutputPath, Artifact,)
from google.cloud import aiplatform

Step 3: Create your lightweight components:

Showtime: we’re going to create 2 components and link them together.

For the sake of simplicity function = component, take a look at the function_2, that is the simplest way to create a component; no need to specify a container image, or packages to install, and actually you can get rid of the arguments if you don’t want to get data from the first function.

If you are tired of reading so much code I will add a single function by the end of this post.

This our topology:

Pipeline Components
  1. function_1 takes 2 integers adds them and creates 2 artifacts: dictionary and a pandas DataFrame.
  2. Both artifacts will be stored in a Google Cloud Storage bucket.
  3. function_2 takes the artifacts from function_1 and print them.
@component(
base_image='python', # Use any container
packages_to_install=[
'pandas', # Packages required
'gcsfs'
])
def function_1(a: int, b: int, output_dict_param_path: OutputPath(Dict[str, int]), dataset: Output[Artifact]):

import pandas as pd # Import libraries
import json

sum_dict = {'result': [a+b]} # Dictionary result

with open(output_dict_param_path, 'w') as file: # Write Dict
file.write(json.dumps(sum_dict))

dataframe = pd.DataFrame(sum_dict) # Create Dataframe
dataframe.to_csv(dataset.path, index=False) # Store Dataframe

@component
def function_2(input_dict: Dict[str, int], dataset: Input[Artifact]) -> int:

import csv

print(input_dict) # Print function_1 res
with open(dataset.path, 'r') as file: # Dataframe Read
csvreader = csv.reader(file)
for row in csvreader:
print(row)

return int(input_dict['result'][0]) # Return Integer

Step 4: Build the Pipeline

If you are familiar with python you will notice that we have not done any different than creating python functions, we just added a decorator (@component) from kfp.v2 (Kubeflow Version 2) to tell the pipeline those functions will be components, That’s it!.

Now let’s build the pipeline or create the chain:

  1. We call the function_1 which adds 2 numbers: 324 and 573, and the new reference created is called function_1_task.
  2. Then to create a link we have to the last reference: function_1_task.outputs[‘dataset’] and use it as an argument of function_2. This is how we create links between the components.
@pipeline(name='my-first-pipe')
def pipeline():
function_1_task = function_1(324,573)
function_2_task = function_2(function_1_task.outputs['output_dict_param_path'], function_1_task.outputs['dataset'])

Step 5: Compile the Pipeline

Now we need to create a file from the pipeline, it will be used later in other stories to automate and schedule pipes.

from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
package_path='first_pipe.json')

Step 6: Run the Job

Just run the job from the file compiled …

import google.cloud.aiplatform as aipjob = aip.PipelineJob(
display_name="first_pipe",
template_path="first_pipe.json",
pipeline_root=PIPELINE_ROOT_PATH,
parameter_values={}
)
job.submit()

… and we’ll get a stunning link to Vertex AI Pipelines where the result will be something like this:

Vertex AI Pipelines

And just like that we have created our first pipeline. We can view logs to catch all the outputs from our code, we can see the dictionary and DataFrame coming from function_1 and used in function_2, we can see the parameters and outputs as a result, we can even do caching components that were not modified, just look at the screenshot; it took 1 min with 11 sec only to run a pipeline.

Homework: when your pipe finished, do navigate and find the artifacts.

Extra: You hate all the code above, this is the simplest way:

# Create the component@component
def func_1(x: int):
print(x)
# Create the pipe
@pipeline(name='pipe')
def pipeline():
func_1_task = func_1(8)
# Compile it
compiler.Compiler().compile(pipeline_func=pipeline,
package_path='first_pipe.json')
# Run it
job = aip.PipelineJob(
display_name="first_pipe",
template_path="first_pipe.json",
pipeline_root=PIPELINE_ROOT_PATH,
parameter_values={}
)
job.submit()

And here is the notebook.

This is the part 1 of pipelines, I’ll talk about the other ways (ML Google Managed Components) in the next chapter, and also I’ll share information about data warehouse systems like BigQuery, or ETLs like DataFlow and use them as a pipeline component, and hopefully we can get into MLOps.

Thank You!

--

--