How to carry out CI/CD in Machine Learning (“MLOps”) using Kubeflow ML pipelines (#3)

Set up your ML components to be automatically rebuilt when there is new code (CI) and a retraining Experiment Run to be launched whenever there is new data (CD)

Lak Lakshmanan
Google Cloud - Community
9 min readFeb 16, 2020

--

In earlier articles, I showed you how to get started with Kubeflow Pipelines and Jupyter notebooks as components of a Kubeflow ML pipeline. Kubeflow Pipelines has changed enough (in a good way! — it’s gotten simpler) that I will start from scratch in this article. In other words, I will show how to set up a cluster, notebook, and submit pipelines. That said, if some concepts are not clear, I recommend that you go back and read those two articles.

Wouldn’t it be great if we had CI/CD with food? The chef can change the item description based on what’s available in the market (CI), you can adapt it when you order (“extra hummus”) when you submit an order and the right food gets made in the kitchen? (CD)

The full set of instructions is available in this README file — please feel free to follow along as you read this article.

1. Set up Hosted Kubeflow Pipelines

An ML Pipeline in Kubeflow Pipelines consists of a Directed Acyclic Graph (DAG) of steps, each of which is a container. For example, this Pipeline consists of five steps — preprocess, hyperparameter training, finetuning the best model, deploying it, and deploying a front-end webapp. The actual application is not important in this context, so I won’t go into it in this article.

An ML Pipeline consists of ML steps, each of which is a Docker container

Pipelines are executed on a Google Kubernetes Engine (GKE) cluster. This cluster is long-lived and you can submit Pipelines to it using the Kubeflow Pipelines API.

Fortunately, you don’t need to setup the GKE cluster, install Kubeflow on it, and configure all the necessary permissions to allow for remote submission. A prebuilt Hosted ML Pipelines image is available on the Google Cloud Marketplace. You can also get to it from https://console.google.com/ai-platform/pipelines/clusters and creating a New Instance.

Once the cluster is up (this takes 2–3 minutes), click on the Pipelines Dashboard link to view available Pipelines and executing Pipeline Runs:

The cluster that has come up has a link to the ML Pipelines dashboard
Use the dashboard to manually upload a pipeline to it, and to look at past & ongoing Experiments

2a. Set up your personal development environment

While you can interact with Kubeflow Pipelines using the dashboard, you are more likely to do your development activities using an AI Platform Notebook.

There is one key difference between developing for kfp and normal development — you will want code executed on your behalf on the cluster. Providing your user authorization to some piece of code is a non-starter — so, create a service account:

SA_NAME=kfpdemo
gcloud iam service-accounts create $SA_NAME \
--display-name $SA_NAME --project "$PROJECT_ID"

Then, give this service account the permissions you want your ML pipeline to have. For example, if you want the ML pipeline to be able to launch Dataflow jobs, you would do:

gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:$SA_NAME@$PROJECT_ID.iam.gserviceaccount.com \
--role=roles/dataflow.developer

The script setup_auth.sh creates a service account and gives it somewhat expansive roles for Cloud Storage, Pub/Sub, BigQuery, Dataflow, and AI Platform (run it from CloudShell). Then, it also copies the secrets to the Kubeflow cluster so that the Kubeflow cluster can use this service account:

gcloud iam service-accounts keys create application_default_credentials.json --iam-account $SA_NAME@$PROJECT_ID.iam.gserviceaccount.com# Attempt to create a k8s secret. If already exists, override.
kubectl create secret generic user-gcp-sa \
--from-file=user-gcp-sa.json=application_default_credentials.json \
-n $NAMESPACE --dry-run -o yaml | kubectl apply -f -

Now we can go to the AI Platform | Notebook section of the GCP Console and create a Notebook. When we create the AI Platform Notebook, though, have it use the service account that you just set up:

The user’s development environment should use a service account that has secrets stored in the kfp cluster.

How would this work in an enterprise development setting? You’d have a Terraform script that creates both the Hosted Pipelines, a set of service accounts (only one user will have actAs permission to each service account), and Notebook instances for each user. Every user would be able to create Notebook instances using “their” service account. The Notebook itself would be restricted to single-user mode to ensure that all operations are auditable.

2b. Create Docker containers for pipeline steps

Now that the development environment is set up, we can develop ML components, write a pipeline to connect the steps and execute the pipeline.

Each step needs to be a container. So, containerize your code by capturing your dependencies in a Dockerfile. For example, this is the Dockerfile for deploying the model to AI Platform:

FROM google/cloud-sdk:latestRUN mkdir -p /babyweight/src && \
cd /babyweight/src && \
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
COPY deploy.sh ./ENTRYPOINT ["bash", "./deploy.sh"]

Notice a few tricks — start from a helpful starter image. In this case, deploying a model involves running a gcloud command, so I start with an image that gives me the latest gcloud. If I need files from source control, I can git clone my repository to make the source code for the trainer available.

The entrypoint to the container is run deploy.sh which is copied from the local directory. The script, in essence, consists of this command:

gcloud ai-platform versions create ${MODEL_VERSION} \
--model ${MODEL_NAME} --origin ${MODEL_LOCATION} \
--runtime-version $TFVERSION
echo $MODEL_NAME > /model.txt
echo $MODEL_VERSION > /version.txt

I’ll describe the reason for the echo commands shortly.

It’s not enough for us have the code for the container, we have to build the container and make it available to the cluster. You can do that using Cloud Build:

gcloud builds submit . --config cloudbuild.yaml

where cloudbuild.yaml specifies the tag in the Google Container Registry (gcr.io):

steps:
- name: 'gcr.io/cloud-builders/docker'
dir: '${DIR_IN_REPO}' # remove-for-manual
args: [ 'build', '-t', 'gcr.io/${PROJECT_ID}/${CONTAINER_NAME}:${TAG_NAME}', '.' ]
images:
- 'gcr.io/${PROJECT_ID}/${CONTAINER_NAME}:${TAG_NAME}'

Now that we have a Docker image available in gcr.io, we can try it out from the Jupyter notebook:

!docker run -t gcr.io/${PROJECT_ID}/babyweight-pipeline-deploycmle:latest gs://${BUCKET}/babyweight/hyperparam/17 babyweight local

2c. Write a Pipeline to connect the steps

Once we have containerized all the ML steps, we can write a pipeline to connect these steps into a DAG. This is done in Python using the kfp API:

@dsl.pipeline(
name='babyweight',
description='Train Babyweight model from scratch'
)
def preprocess_train_and_deploy(
project='ai-analytics-solutions',
bucket='ai-analytics-solutions-kfpdemo',
start_year='2000'
):
"""End-to-end Pipeline to train and deploy babyweight model"""
# Step 1: create training dataset using Apache Beam on Cloud Dataflow
preprocess = dsl.ContainerOp(
name='preprocess',
# image needs to be a compile-time string
image='gcr.io/ai-analytics-solutions/babyweight-pipeline-bqtocsv:latest',
arguments=[
'--project', project,
'--mode', 'cloud',
'--bucket', bucket,
'--start_year', start_year
],
file_outputs={'bucket': '/output.txt'}
).apply(use_gcp_secret('user-gcp-sa'))
# Step 2: Do hyperparameter tuning of the model on Cloud ML Engine
hparam_train = dsl.ContainerOp(
name='hypertrain',
# image needs to be a compile-time string
image='gcr.io/ai-analytics-solutions/babyweight-pipeline-hypertrain:latest',
arguments=[
preprocess.outputs['bucket']
],
file_outputs={'jobname': '/output.txt'}
).apply(use_gcp_secret('user-gcp-sa'))

The key points from the code snippet above:

  • Decorate the function with `@dsl.pipeline`
  • The parameters to the function can be used to configure the run
  • Each step in my case is a ContainerOp that refers to the Docker image that we pushed to gcr.io. The image name has to be a static string.
  • You can pass arguments to the container. These will become command-line parameters to the entrypoint
  • Specify where the outputs of the step will show up
  • The outputs of step 1 (bucket) are the inputs to step 2 (preprocess.outputs[‘bucket’]) — note that the name of the step is used to reference which step’s output is needed. You can use any step here as long as it doesn’t introduce a circular dependency.

When we say that the output of the preprocess step will be in /output.txt, we have to make sure to put the data there. That’s why my deploy.sh in the previous section was echoing a couple of necessary inputs to subsequent steps to files.

2d. Execute the pipeline manually

One you have the pipeline written, you can compile it using the command-line compiler that ships with kfp and then upload it manually to the dashboard. But you can also directly submit the code using the Python API. This is more convenient from a dashboard:

args = {
'project' : PROJECT,
'bucket' : BUCKET
}
client = kfp.Client(host=PIPELINES_HOST)
pipeline = client.create_run_from_pipeline_func(
preprocess_train_and_deploy,
args)

You can get the PIPELINES_HOST for your cluster by looking at the Settings of the cluster in the GCP Console.

This launches the pipelines and retains the logs on the cluster.

3a. Set up continuous integration (CI)

Now that we have the code correctly working, we are ready to set up continuous integration. Basically, we want to rebuild a Docker image whenever any of the files it depends on is committed to the source repository.

To make this simple, I have organized the code so that each step is in a self-contained directory:

So all that we have to do is to connect the GitHub repository to GCP account and set up a bunch of Cloud Build triggers from GitHub:

create_github_trigger() {
DIR_IN_REPO=$(pwd | sed "s%${REPO_NAME}/% %g" | awk '{print $2}')
gcloud beta builds triggers create github \
--build-config="${DIR_IN_REPO}/cloudbuild.yaml" \
--included-files="${DIR_IN_REPO}/**" \
--branch-pattern="^master$" \
--repo-name=${REPO_NAME} --repo-owner=${REPO_OWNER}
}
for container_dir in $(ls -d */ | sed 's%/%%g'); do
cd $container_dir
create_github_trigger
cd ..
done

Note: To verify this, you have to fork my GitHub repo and try it out with your fork — you likely don’t have permissions to connect the GoogleCloudPlatform repos to your own GCP project.

Now, whenever a file is committed, the corresponding trigger launches a Cloud Build:

3b. Set up continuous deployment (CD)

Unlike with web applications, it is not the case that we want to re-run every ML model whenever we update the ML components the pipeline consists of. So, think carefully about what type of event warrants re-execution and whether that re-execution is of the full pipeline or only a part of it.

A common scenario is that we want to retrain the model (or perhaps we want to only finetune the training) every time we have new data. Let’s assume that we want to train and immediately deploy the updated model (i.e., we don’t have a staging phase with A/B testing etc.). In that case, we’d create a new pipeline method that consists of only the train and deploy steps and launch it in response to a new file in Cloud Storage.

The simplest way to trigger off Cloud Storage is to use Cloud Functions. So, we could do:

def handle_newfile(data, context):
filename = data['filename']
mlp_babyweight.finetune_and_deploy(filename)

where the finetune_and_deploy method is:

def finetune_and_deploy(filename):
"""invoked from a Cloud Function or a Cloud Run, it launches a Pipeline on kfp"""
import kfp
import sys

if 'babyweight/preproc/train' in filename:
PIPELINES_HOST = os.environ.get('PIPELINES_HOST', "Environment variable PIPELINES_HOST not set")
PROJECT = os.environ.get('PROJECT', "Environment variable PROJECT not set")
BUCKET = os.environ.get('BUCKET', "Environment variable BUCKET not set")
print("New file {}: Launching ML pipeline on {} to finetune model in {}".format(
filename, PIPELINES_HOST, BUCKET))
sys.stdout.flush()
client = kfp.Client(host=PIPELINES_HOST)
args = {
'project' : PROJECT,
'bucket' : BUCKET,
}
pipeline = client.create_run_from_pipeline_func(train_and_deploy, args)
return 'Fine tuning job Launched!'

We can then deploy the Cloud Function using:

gcloud functions deploy handle_newfile --runtime python37 \
--set-env-vars PROJECT=${PROJECT},BUCKET=${BUCKET},PIPELINES_HOST=${PIPELINES_HOST},HPARAM_JOB=${HPARAM_JOB} \
--trigger-resource=${BUCKET} \
--trigger-event=google.storage.object.finalize

Note that my repository shows you a more complex setup. I containerize the execution code, and launch it using Cloud Run. While we can trigger Cloud Run off Cloud Storage, it requires setting up a Pub/Sub topic etc. So, instead, i trigger a Cloud Function, and then have the Cloud Function invoke Cloud Run.

If we create a new file in Cloud Storage in the preproc directory, the train and deploy ML stages are launched and the model is updated.

Enjoy!

Next steps:

  • Try out the steps in this README.md file in GitHub
  • Read the Google Cloud solution on this topic — the GitHub repo associated with the solution gives you Terraform scripts, etc. to do this in an enterprise context.

--

--

Lak Lakshmanan
Google Cloud - Community

articles are personal observations and not investment advice.