Kubeflow Times Machine Learning — Reproducibility Step by Step

Very often a workflow of training machine learning models and delivering them to production environment contains loads of manual work. These could be various steps depending on the type of a model you are using, company’s workflow you are working within and requirements of the deployed model. Industry has already developed tools for continuous software delivery/integration, but they cannot be directly applied for machine learning models which designates the problem. In this article we will show a way to create a pipeline that connects machine learning workflow steps (like collecting & preparing data, model training, model deployment and so on) into a single reproducible run, which you can execute with a single button push.

But before we actually start implementing such a pipeline we should get on the same page about the general machine learning workflow. Let’s quickly recap the flow and its main aspects:

  1. Research in the subject area;
    Suppose we’ve already have a task we need to solve. We need to make a research in the subject area, define requirements for the solution, define data sources, define tools that we will use and methods we will apply (different algorithms and so on).
  2. Data preparation;
    We start with collecting the data, transforming it into a desirable form and proceed with it to further steps.
  3. Model training;
    We build the model and train it using prepared data. We experiment using different architectures, run training several times while tuning hyper-parameters and evaluating model’s performance.
  4. Cataloguing;
    Once the model has been trained we export it into a self-sufficient format. This means that a model’s artifact can be independently used on other infrastructures without passing its source code and all dependencies along. We handle versioning of the model and data used for training as well as extraction of the inputs and outputs of the model to use it for deployment.
  5. Model deployment;
    After the model has been exported we deploy it on the prepared infrastructure and provision HTTP/gRPC endpoints to it.
  6. Integration Testing;
    We perform integration testing to ensure the model runs properly on the prepared environment.
  7. Production Inferencing;
    We start inferencing on our model while collecting all requests and predictions during model’s lifetime. We provide different deployment variants, such as A/B or Canary deployments for different model versions.
  8. Performance Monitoring;
    During inference time we automatically monitor how model behaves in the production environment; how data flowing through models changes over a time, including insights to concept drifts and anomaly detection.
  9. Model Maintenance;
    This stage is dedicated to the model’s long-term live. We interpret model’s predictions and explain why a model behaves in a way it does. We use that knowledge to improve the model by retraining it further on the production traffic, so to increase its performance.

We will go over each of these steps throughout the article.

Tools

A short introduction to the tools we’re going to use:

  • Kubeflow is a machine learning toolkit for Kubernetes. It began as just a simpler way to run TensorFlow jobs on Kubernetes, but has since expanded to be a multi-architecture, multi-cloud framework for running and automating machine learning pipelines.
  • Hydrosphere.io is an open source model management platform for deploying, serving and monitoring machine learning models and ad-hoc algorithms.

To make a great picture of the future pipeline, let’s see how different tools within Kubeflow toolset and Hydrosphere.io platform map onto workflow steps.

The primary focus of Kubeflow ecosystem is to train machine learning models built in different frameworks in a distributed mode by leveraging the underlying Kubernetes cluster. This gives Kubeflow a noticeable advantage over manual set ups of such training systems. Model deployment is another part that Kubeflow manages to cover with niche (regarding frameworks) and over deployment tools. On top of that there’s a new service, called Pipelines, which’s main goal is to connect different workflow steps into a single processing run.

Hydrosphere.io is comprehends after-training part of the workflow. Its primary goals are model management, model delivery to production in simple and robust fashion, as well as monitoring models during inference time and gaining insights from their predictions to further improve models’ quality.

Prerequisites

This tutorial presumes that you have an access to a Kubernetes cluster. If you don’t, you can create your own single node cluster locally with Minikube. You will additionally need:

Locally:

On the Kubernetes Cluster:

By default Kubeflow will be installed in the kubeflow namespace. In this tutorial I assume that Hydrosphere.io is installed in the same namespace as Kubeflow.

PersistentVolumeClaims

We will additionally need to provide a persistent storage for workflow pods to retain intermediate files and supply them between stages.

$ kubectl apply -f - <<EOF 
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 20Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: models
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 20Gi
EOF

Kubeflow Pipelines Manifest

The whole workflow will be running on top of Kubeflow Pipelines. Pipelines are built upon Argo Workflows — a container-native workflow engine. To execute such workflow you have to create a Kubernetes blueprint and then apply it on the cluster to start the job. To ease the process of creating such .yaml definitions Kubeflow released a Python SDK library, which allows you to define your workflows as functions and then compile them to DSL.

First of all we will define a frame of that function. Create a root repository for this tutorial and define pipeline.py script in it.

$ mkdir mnist  # create a root directory for this tutorial
$ touch pipeline.py

We’ve already created PVCs in our cluster, so we can supply pipeline definition with references to them.

Throughout the article I will provide short code snippets to this pipeline.py file, which will only contain differences from previous versions. The full code is available here.

To mention a few changes here — Kubeflow provides us with is the ability to customise the workflow from Pipelines UI. To do that you just have to supply parameters to the function like we do with data_directory and models_directory variables. There are some limitations using those parameters, but they would essentially disappear in the upcoming SDK versions. For now you can use these parameters in your function like so: a data_directory variable will be mapped to a {{workflow.parameters.data-directory}} string, when compiled into DSL, which we use while declaring volumes.

Let’s start with the actual machine learning model.

Step 1. Research in the Subject Area

In order to stay focused on the automation part I’ve chosen a well known digit recognition problem.

Objectives:

  • Given an image of the hand-written digit classify what digit it is.

Requirements:

  • The deployed model should be able to predict on batches of images;

Methods:

  • A fully-connected neural network (for implementation simplicity).

Tools:

  • Language — Python (contains various deep learning frameworks);
  • Framework — Tensorflow (has a great model exportation mechanism; has a pre-made classification architectures).

Data Sources:

  • MNIST (50000 train & 10000 test grayscale 28x28 images of handwritten digits).

Step 2. Data Preparation

Real world scenario often forces you to obtain training data from the company’s data storage. The case might consist of the following:

  1. Gain access to the data warehouse;
  2. Analyse the schema and write a query for selection;
  3. Transform selected data into a desired form and store it for training.

With the chosen problem we don’t have such a reach possibility. We will use an original publicly available MNIST dataset. Let’s create the first executable that will download the data and store it in our PersistentVolumeClaim (PVC).

$ mkdir 01-download; cd 01-download
$ touch download.py

This downloads all 4 files, unpacks them, converts into numpy arrays and stores those arrays under base_dir directory. The directory in this case will be a PVC, mounted to the pod under path specified by MNIST_DATA_DIR environment variable (we specify that variable below).

To be able to execute this file on the cluster as part of the workflow we need to pack it into Docker container. Create a Dockerfile for that.

$ cat > Dockerfile << EOF
FROM python:3.6-slim
RUN pip install numpy==1.14.3 Pillow==5.2.0
ADD ./download.py /src/
WORKDIR /src/
ENTRYPOINT [ "python", "download.py" ]
EOF

We use python:3.6-slim as our base image, while additionally installing some python dependencies.

Once container definition has been created, we can build a Docker image from it and push it to the public Docker registry. As a username you can use your own Docker Hub account or use any available for you Docker Registry.

$ cd ..
$ docker build -t {username}/mnist-pipeline-download 01-download
$ docker push {username}/mnist-pipeline-download

Lastly let’s update pipeline definition.

Here we define a new environment variable (1) to be attached to our container, define a container operation (2) to be executed on the cluster and finally attach (3) required resources.

Step 3. Model Training

After our data is ready we can switch to model building and training. Typically at this stage you build a model, train it, evaluate, tune hyper-parameters and repeat that loop until model has a desired performance. So, let’s first create a directory for the executable.

$ mkdir 02-train; cd 02-train
$ touch train.py

The model is quite simple, we just use a pre-made fully-connected neural network classifier (tf.estimator.DNNClassifier). We specify, that the model consists of 2 hidden layers with 256 and 64 units respectively. learning_rate, num_steps and batch_size are parameters, configured by environment variables. num_steps is responsible for how long the model is going to be trained.

After model training finishes, we immediately store it in the models PVC in the tf.saved_model format, specifying inputs of the model. Outputs will be automatically inferred.

Let’s create a Docker image from that executable.

$ cat > Dockerfile << EOF
FROM tensorflow/tensorflow:1.9.0-py3
ADD ./train.py /src/
WORKDIR /src/
ENTRYPOINT [ "python", "train.py" ]
EOF
$ cd ..
$ docker build -t {username}/mnist-pipeline-train 02-train
$ docker push {username}/mnist-pipeline-train

Update pipeline definition with training model container definition.

Similarly to the previous step we additionally define new environment variables (1) and define a container operation (2). We additionally specify, that this operation must be executed after the previous operation download completes. Otherwise they will start in parallel. At the bottom we define how much resources this container needs on the cluster (5). This instructs Kubeflow to start this container only when the required conditions are met.

Step 4. Model Cataloguing

Model Cataloguing is a term I would like to use to comprehend the following:

  • Metadata extraction:
    — Graph definition and weights;
    — Inputs and outputs definitions;
    — Training data version / statistics;
    — Other dependencies (look_up vocabularies, etc);
  • Model artifact versioning;
  • Storing model in Repository;

This describes the model in great detail. We can use such information for model deployment since we know inputs and outputs of the model and can build an appropriate infrastructure. We can identify data that was used for training for the current model. Later on we can experiment with the different architectures and varying data versions to see if we can reach better performance. This also allows us to analyze how newer architecture works with previous data versions, hence compare the performance. We version models to compare their performance via A/B or Canary deployments (we will talk about this in the next sections).

We have already done half of this in the previous step when we exported the model to tf.saved_model format. Other steps require actions though. If we will do this manually we would need to create a repository where all of our models will be stored and versioned. Along with that we would also need to handle metadata that wasn't provisioned with the model.

$ mkdir 03-upload; cd 03-upload
$ touch execute.sh

At this stage we build a model artifact (retrieve all metadata needed for later inference and pack it along with the model binaries) and store the built artifact in a repository. The artifact is a Docker image with a frozen model inside it whereas the repository is a configured Docker registry (could be external). Along with that we supplement the model with the training data statistics, used for this model.

In case you are wondering why would we need to store a built model’s version in a file inside the pod (ii) → we can instruct Kubeflow to retrieve the contents of that file and use them in the subsequent pipeline steps (we will need it).

Let’s create a Docker image from that executable.

$ cat > Dockerfile << EOF
FROM python:3.6-slim
RUN apt update && apt install -y jq
RUN pip install hs==0.1.5
ADD ./execute.sh /src/
WORKDIR /src/
ENTRYPOINT [ "bash", "execute.sh" ]
EOF
$ cd ..
$ docker build -t {username}/mnist-pipeline-upload 03-upload
$ docker push {username}/mnist-pipeline-upload

Update pipeline definition.

One minor difference from code snippets — we tell Kubeflow to store the contents of /model_version.txt under model_version key in the internal key-value repository (line 26). We will use this in a subsequent step.

Step 5. Model Deployment

The next step is the actual model deployment. At this step we have a built model artifact stored in our repository which is not yet exposed to the world. A typical way of exposing model outside of the cluster consists of creating a runtime for the model and deploying it to the cluster (REST API web server, which will inference on the model and return predictions). There are niche solutions like Tensorflow Serving, TensorRT, but they are focused on the specific frameworks. There are no way to use them for models built on different frameworks such as scikit-learn or MXNet.

Along with implementing a runtime there will come up a necessity of implementing service meshing between model versions to create Canary deployments (when you deploy, for example, three model versions and let the traffic flow through them in different proportions, like 30/30/40, exposing outside only one endpoint). You might want to connect your models in chain, where some models retrieve features and others infer on that features.

Implementing such runtimes, configuring an underlying infrastructure, managing model interactions is not a trivial job. Using Hydrosphere.io you only have to create an application.

An application here is a microservice with an exposed API endpoint to interact with your model. It allows you to use your most recent deployed production models via HTTP-requests, gRPC API calls or configure it as a part of Kafka streams. It uses predefined runtimes to run your model artifacts and helps you to configure a multi-staged execution from model to model. Within an application you can design a setup with canary or A/B model deployments.

$ mkdir 04-deploy; cd 04-deploy
$ touch execute.sh

Let’s quickly analyze the manifest and its main parts:

  1. We define what resource this manifest describes;
  2. We define an application name;
  3. By defining a singular flag we state that this application is single-staged and contains just a single model version;
  4. Here we define what model will be deployed and its version;
  5. And finally we specify a runtime for the model.

That’s basically it. After the last command the model will be deployed within an application and exposed to the world.

Let’s create a Docker image from that step.

$ cat > Dockerfile << EOF
FROM python:3.6-slim
RUN pip install hs==0.1.5
ADD ./execute.sh /src/
WORKDIR /src/
ENTRYPOINT [ "bash", "execute.sh" ]
EOF
$ cd ..
$ docker build -t {username}/mnist-pipeline-deploy 04-deploy
$ docker push {username}/mnist-pipeline-deploy

Update pipeline definition.

The minor difference here is located at line 19. We use output value from the previous upload container (model_vesion key) and pass it as an argument to the entrypoint of the deploy container operation.

Step 6. Model Integration Testing

This step aims to test the model when it’s deployed into production-like infrastructure. It helps us to eliminate configuration bugs, deployment and integration issues. It’s important to write test suites that comprehend the following data:

  • A golden data set;
  • Edge cases;
  • Recent traffic.

Let’s create a client application which will invoke the deployed model and estimate accuracy.

$ mkdir 05-test; cd 05-test
$ touch test.py

generate_data function reads some amount of the testing data saved in our PVC. Then we use HTTP endpoint to reach our model, get predictions and store them in a list. At last we assert that accuracy of our model is above acceptable level. acceptable_accuracy reflects a corresponding environment variable.

Let’s create a Docker image from that step.

$ cat > Dockerfile << EOF
FROM python:3.6-slim
RUN pip install scikit-learn==0.20.0 requests==2.21.0 numpy==1.14.3
ADD ./test.py /src/
WORKDIR /src/
ENTRYPOINT [ "python", "test.py" ]
EOF
$ cd ..
$ docker build -t {username}/mnist-pipeline-test 05-test
$ docker push {username}/mnist-pipeline-test

Update pipeline definition. Everything is similar to the whole pipeline definition.

Run Workflow

This was the last step we can implement within Kubeflow pipelines but the overall machine learning workflow does not end here. There’re a few important steps left but we will deal with them later.

We can compile the pipeline definition now.

$ python pipeline.py

This will produce a pipeline.tar.gz file. Let’s execute this pipeline on our cluster.

$ kubectl port-forward deployment ambassador 8085:80

This will allow you to reach Kubeflow UI from your local machine. Open http://localhost/pipeline/.

Upload pipeline.tar.gz file using Upload pipeline button.

Create a run (you would also have to create an experiment to which this run will be attached). During run creation you can specify parameters we’ve defined earlier.

And finally start a run.

7. Production Inferencing

At this stage we have our model up and running in the production environment. We start collecting all requests and predictions during model’s lifetime. As model evolves (different architectures/more data were used) we might want to create different deployment configurations, like below:

Throughout the life of the model we need to make sure that it scales appropriately according to the production needs. We want to increase the number of model replicas, when the incoming traffic hits certain limits (for example, by analyzing Latency degradation), and to decrease them respectively when the traffic goes down.

8. Model Performance Monitoring

During inference time we automatically monitor how model behaves in the production environment as well as how data, flowing through the model, changes. This helps us to gain insights about our vision of the production flow.

There’re a few graphics available for use out of the box — Requests Count and Latency. Their names are self-explanatory, the first one just counts how many request a model receives over a time, the second one watches how long each request took to proceed.

Other metrics use more sophisticated methods. One possible metric is Autoencoder. With it you can capture a “concept” of your data and monitor its reconstruction error. Let’s send some traffic to the deployed model.

On the left-hand side the plot shows how the production data was reconstructed when normal MNIST images were sent to the model. The reconstruction score merely reached 0.1 point. For this graphic we’ve also defined a threshold to be 0.15 score which means whenever a reconstruction score goes above the red line, the notification will be formed.

On the right hand side you can see how plot burst up to 60.000 points of the reconstruction score once the data was changed to letters (notMNIST dataset). Once monitoring identified that, it changed the state of the model to be “unhealthy”.

There’re also other evaluation metrics, but they would require you to provide labels for your requests. For this model we can count accuracy on the production stream.

The first version of the model (blue line) was less efficient judging by the diagrams above. It was subject to errors on image batches, thus showed downgrades of accuracy down to 0.85 percent on a batch. The second version (green line) looks better, as there are wider gaps between accuracy dropdowns (meaning, that all images in intermediate batches were classified perfectly == 1), as well as a higher accuracy score.

9. Model Maintenance

Root Cause Analysis

We are moving to the last stage where we are touching a model maintenance topic. Recall the example with a huge concept drift:

It’s not so hard to understand what happened to our model if we just look to the requests. We can improve our model so it will be able to classify letters as well, but there are other cases: what if a concept drift doesn’t occur but the accuracy of the model stays low?

Machine learning models are often black-boxes for us and thus cannot be directly interpreted. But there are algorithms that help us to understand why a particular prediction was made. One of such methods called RISE.

Briefly speaking, this algorithm treats our model as a black box. It generates a bunch of masks, hiding out different portions of pixels from the original image, and watches how model’s predictions change. Based on those observations the algorithm generates an importance map, showing what regions of the image play significant role for a particular class.

In the plot above you can see importance regions for sample images (rows) with the respect to each class (columns). Ground truth image/class correlations are presented on the major diagonal.

Our current model is correct most of the time (green boxes), but it did confuse an image of a five and wrongly classified it as a three. Let’s investigate why this happened. If we take a look at the importance region of pixels for image of three and its respective class (row 4, column 4), we can see that the classifier watches on the top half of the digit and probably takes into account curly forms that a digit three has. Likewise an image of a five (row 6, column 4) has similar curly shapes, but the importance map is more shifted to the right. An importance map for the class five (row 6, column 6) shows, that for classifying a digit five the model watches on the empty left region.

Using this knowledge we can supply our training dataset with more related cases, or change model’s architecture.

Subsampling

At the time we’re touching inference mode we have no more training and testing datasets. We trained our model on as much data as possible, and deploy the model to production. But since then we would continuously receive more data for which, for sure, labels won’ be immediately available, but we would eventually obtain them sooner or later too.

The next logical step would be retraining the model on the newer data to capture more cases and enhance model’s performance. Returning to our pipeline — you can configure Kubeflow to train new models periodically and upload them to production.

When you send a request to the model deployed on Hydrosphere.io platform, along with the model’s predictions you will receive additional system information, like the trace_id for your request. You can store that trace_id and model’s predictions in your data storage and later on match these requests to the ground truth labels. We can use that data to fine tune existing model or train a new one.

Here we download [requests, responses] from Hydrosphere.io, look up for ground truth labels from MNISTTable and then prepare a new training dataset. The subsample line binary_data=reqstore.APIHandler.subsample(addr, name) would not download the whole inference history of our model, but it will instead download a representative subsample with the distribution of the production data. This way we can train a new model on the most relevant cases.

Using this script we can build a new container, replace the first step in the above pipeline and create a recurring run to repeatedly train and deploy new models every month with the most relevant training data.

Conclusion

Configuring ML pipelines, once done properly, will save a lot of time and efforts in machine learning engineers’ daily routines. In this tutorial we’ve created an infrastructure an ML team can productively use every day multiple times in dispense with dedicated DevOps or software engineers.